This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d6732ffd19 Logical table query quota enforcement - SSE (#15839) d6732ffd19 is described below commit d6732ffd1943a6a11358ad36645e07769e1c84ff Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Tue May 27 12:57:28 2025 +0530 Logical table query quota enforcement - SSE (#15839) --- ...okerResourceOnlineOfflineStateModelFactory.java | 1 + .../BrokerUserDefinedMessageHandlerFactory.java | 1 + .../HelixExternalViewBasedQueryQuotaManager.java | 117 +++++++++++++-------- .../pinot/broker/queryquota/QueryQuotaManager.java | 7 ++ .../requesthandler/BaseBrokerRequestHandler.java | 3 +- .../BaseSingleStageBrokerRequestHandler.java | 23 ++-- .../pinot/common/metadata/ZKMetadataProvider.java | 5 +- .../controller/helix/ControllerRequestClient.java | 23 ++++ .../helix/core/PinotHelixResourceManager.java | 9 +- .../pinot/controller/helix/ControllerTest.java | 12 ++- .../tests/QueryQuotaClusterIntegrationTest.java | 94 ++++++++++++++++- .../BaseLogicalTableIntegrationTest.java | 34 +++--- ...hTwoOfflineOneRealtimeTableIntegrationTest.java | 2 +- 13 files changed, 241 insertions(+), 90 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java index cc2f2f406a..d9e83d4e4c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java @@ -80,6 +80,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact try { if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTable); + _queryQuotaManager.initOrUpdateLogicalTableQueryQuota(physicalOrLogicalTable); } else { _routingManager.buildRouting(physicalOrLogicalTable); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, physicalOrLogicalTable); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java index 033a126ea6..81ea3d0d4f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java @@ -154,6 +154,7 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac @Override public HelixTaskResult handleMessage() { _routingManager.buildRoutingForLogicalTable(_logicalTableName); + _queryQuotaManager.initOrUpdateLogicalTableQueryQuota(_logicalTableName); HelixTaskResult result = new HelixTaskResult(); result.setSuccess(true); return result; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 925fbc4860..7bb8641271 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -49,6 +49,7 @@ import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.zookeeper.data.Stat; @@ -175,6 +176,20 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } } + public void initOrUpdateLogicalTableQueryQuota(String logicalTableName) { + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName); + if (logicalTableConfig == null) { + LOGGER.info("No query quota to update since logical table config is null"); + return; + } + + LOGGER.info("Initializing rate limiter for logical table {}", logicalTableName); + + ExternalView brokerResourceEV = getBrokerResource(); + Stat stat = _propertyStore.getStat(constructLogicalTableConfigPath(logicalTableName), AccessOption.PERSISTENT); + createOrUpdateRateLimiter(logicalTableName, brokerResourceEV, logicalTableConfig.getQuotaConfig(), stat); + } + public void initOrUpdateTableQueryQuota(String tableNameWithType) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); ExternalView brokerResourceEV = getBrokerResource(); @@ -195,24 +210,25 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan LOGGER.info("Initializing rate limiter for table {}", tableNameWithType); // Create rate limiter if query quota config is specified. - createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV, tableConfig.getQuotaConfig()); + Stat stat = _propertyStore.getStat(constructTableConfigPath(tableNameWithType), AccessOption.PERSISTENT); + createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV, tableConfig.getQuotaConfig(), stat); } /** * Drop table query quota. - * @param tableNameWithType table name with type. + * @param physicalOrLogicalTable physical or logical table name. */ - public void dropTableQueryQuota(String tableNameWithType) { - LOGGER.info("Dropping rate limiter for table {}", tableNameWithType); - removeRateLimiter(tableNameWithType); + public void dropTableQueryQuota(String physicalOrLogicalTable) { + LOGGER.info("Dropping rate limiter for table {}", physicalOrLogicalTable); + removeRateLimiter(physicalOrLogicalTable); } /** Remove or update rate limiter if another table with the same raw table name but different type is still using * the quota config. - * @param tableNameWithType table name with type + * @param physicalOrLogicalTable physical or logical table name. */ - private void removeRateLimiter(String tableNameWithType) { - _rateLimiterMap.remove(tableNameWithType); + private void removeRateLimiter(String physicalOrLogicalTable) { + _rateLimiterMap.remove(physicalOrLogicalTable); } /** @@ -230,26 +246,27 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan /** * Create or update a rate limiter for a table. - * @param tableNameWithType table name with table type. + * @param physicalOrLogicalTableName physical or logical table name. * @param brokerResource broker resource which stores all the broker states of each table. * @param quotaConfig quota config of the table. + * @param tableStat stat of the table config. */ - private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView brokerResource, - QuotaConfig quotaConfig) { + private void createOrUpdateRateLimiter(String physicalOrLogicalTableName, ExternalView brokerResource, + QuotaConfig quotaConfig, Stat tableStat) { if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) { - LOGGER.info("No qps config specified for table: {}", tableNameWithType); - buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType); + LOGGER.info("No qps config specified for table: {}", physicalOrLogicalTableName); + buildEmptyOrResetRateLimiterInQueryQuotaEntity(physicalOrLogicalTableName); return; } if (brokerResource == null) { - LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", tableNameWithType); + LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", physicalOrLogicalTableName); // It could be possible that brokerResourceEV is null due to ZK connection issue. // In this case, the rate limiter should not be reset. Simply exit the method would be sufficient. return; } - Map<String, String> stateMap = brokerResource.getStateMap(tableNameWithType); + Map<String, String> stateMap = brokerResource.getStateMap(physicalOrLogicalTableName); int otherOnlineBrokerCount = 0; // If stateMap is null, that means this broker is the first broker for this table. @@ -263,26 +280,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } int onlineCount = otherOnlineBrokerCount + 1; - LOGGER.info("The number of online brokers for table {} is {}", tableNameWithType, onlineCount); + LOGGER.info("The number of online brokers for table {} is {}", physicalOrLogicalTableName, onlineCount); // Get the dynamic rate double overallRate = quotaConfig.getMaxQPS(); - - // Get stat from property store - String tableConfigPath = constructTableConfigPath(tableNameWithType); - Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); double perBrokerRate = overallRate / onlineCount; - QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType); + QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(physicalOrLogicalTableName); if (queryQuotaEntity == null) { queryQuotaEntity = new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), - new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, stat.getVersion()); - _rateLimiterMap.put(tableNameWithType, queryQuotaEntity); + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, tableStat.getVersion()); + _rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity); LOGGER.info( "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online " - + "broker instances: {}. Table config stat version: {}", tableNameWithType, overallRate, perBrokerRate, - onlineCount, stat.getVersion()); + + "broker instances: {}. Table config stat version: {}", physicalOrLogicalTableName, overallRate, + perBrokerRate, onlineCount, tableStat.getVersion()); } else { RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter(); double previousRate = -1; @@ -297,14 +310,14 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } queryQuotaEntity.setNumOnlineBrokers(onlineCount); queryQuotaEntity.setOverallRate(overallRate); - queryQuotaEntity.setTableConfigStatVersion(stat.getVersion()); + queryQuotaEntity.setTableConfigStatVersion(tableStat.getVersion()); LOGGER.info( "Rate limiter for table: {} has been updated. Overall rate: {}. Previous per-broker rate: {}. New " + "per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}", - tableNameWithType, overallRate, previousRate, perBrokerRate, onlineCount, stat.getVersion()); + physicalOrLogicalTableName, overallRate, previousRate, perBrokerRate, onlineCount, tableStat.getVersion()); } - addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); - addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); + addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity); + addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity); if (isQueryRateLimitDisabled()) { LOGGER.info("Query rate limiting is currently disabled for this broker. So it won't take effect immediately."); } @@ -536,19 +549,19 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query * quota entity. */ - private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String tableNameWithType) { - QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType); + private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String physicalOrLogicalTableName) { + QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(physicalOrLogicalTableName); if (queryQuotaEntity == null) { // Create an QueryQuotaEntity object without setting a rate limiter. queryQuotaEntity = new QueryQuotaEntity(null, new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0); - _rateLimiterMap.put(tableNameWithType, queryQuotaEntity); + _rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity); } else { // Set rate limiter to null for an existing QueryQuotaEntity object. queryQuotaEntity.setRateLimiter(null); } - addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); - addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); + addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity); + addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity); } /** @@ -680,6 +693,16 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan return offlineQuotaOk && realtimeQuotaOk; } + @Override + public boolean acquireLogicalTable(String logicalTableName) { + QueryQuotaEntity logicalTableQueryQuotaEntity = _rateLimiterMap.get(logicalTableName); + if (logicalTableQueryQuotaEntity != null) { + LOGGER.debug("Trying to acquire token for logical table: {}", logicalTableName); + return tryAcquireToken(logicalTableName, logicalTableQueryQuotaEntity); + } + return true; + } + /** * Try to acquire token from rate limiter. Emit the utilization of the qps quota if broker metric isn't null. * @param resourceName resource name to acquire. @@ -749,7 +772,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan int numRebuilt = 0; for (Iterator<Map.Entry<String, QueryQuotaEntity>> it = _rateLimiterMap.entrySet().iterator(); it.hasNext(); ) { Map.Entry<String, QueryQuotaEntity> entry = it.next(); - String tableNameWithType = entry.getKey(); + String physicalOrLogicalTableName = entry.getKey(); QueryQuotaEntity queryQuotaEntity = entry.getValue(); if (queryQuotaEntity.getRateLimiter() == null) { // No rate limiter set, skip this table. @@ -757,9 +780,9 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan } // Get number of online brokers. - Map<String, String> stateMap = currentBrokerResourceEV.getStateMap(tableNameWithType); + Map<String, String> stateMap = currentBrokerResourceEV.getStateMap(physicalOrLogicalTableName); if (stateMap == null) { - LOGGER.info("No broker resource for Table {}. Removing its rate limit.", tableNameWithType); + LOGGER.info("No broker resource for Table {}. Removing its rate limit.", physicalOrLogicalTableName); it.remove(); continue; } @@ -773,10 +796,14 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan int onlineBrokerCount = otherOnlineBrokerCount + 1; // Get stat from property store - String tableConfigPath = constructTableConfigPath(tableNameWithType); - Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); + String physicalOrLogicalTableConfigPath = + ZKMetadataProvider.isTableConfigExists(_propertyStore, physicalOrLogicalTableName) + ? constructTableConfigPath(physicalOrLogicalTableName) + : constructLogicalTableConfigPath(physicalOrLogicalTableName); + Stat stat = _propertyStore.getStat(physicalOrLogicalTableConfigPath, AccessOption.PERSISTENT); if (stat == null) { - LOGGER.info("Table {} has been deleted from property store. Removing its rate limit.", tableNameWithType); + LOGGER.info("Table {} has been deleted from property store. Removing its rate limit.", + physicalOrLogicalTableName); it.remove(); continue; } @@ -790,10 +817,10 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan double overallRate; // Get latest quota config only if stat don't match. if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) { - QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(tableNameWithType); + QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(physicalOrLogicalTableName); if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) { LOGGER.info("No query quota config or the config is invalid for Table {}. Removing its rate limit.", - tableNameWithType); + physicalOrLogicalTableName); it.remove(); continue; } @@ -810,7 +837,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan queryQuotaEntity.setTableConfigStatVersion(stat.getVersion()); LOGGER.info("Rate limiter for table: {} has been updated. Overall rate: {}. Previous per-broker rate: {}. New " + "per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}.", - tableNameWithType, overallRate, previousRate, latestRate, onlineBrokerCount, stat.getVersion()); + physicalOrLogicalTableName, overallRate, previousRate, latestRate, onlineBrokerCount, stat.getVersion()); numRebuilt++; } } @@ -931,4 +958,8 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan private String constructTableConfigPath(String tableNameWithType) { return "/CONFIGS/TABLE/" + tableNameWithType; } + + private String constructLogicalTableConfigPath(String tableName) { + return "/LOGICAL/TABLE/" + tableName; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java index 70c3ef7588..eb3de472d9 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java @@ -27,6 +27,13 @@ public interface QueryQuotaManager { */ boolean acquire(String tableName); + /** + * Try to acquire a quota for the given logical table. + * @param logicalTableName Logical table name + * @return {@code true} if the table quota has not been reached, {@code false} otherwise + */ + boolean acquireLogicalTable(String logicalTableName); + /** * Try to acquire a quota for the given database. * @param databaseName database name diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 2367334497..2a344876c3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -285,7 +285,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { * @return true if the query was successfully cancelled, false otherwise. */ protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, - HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) throws Exception; + HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) + throws Exception; protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) { statistics.setNumRowsResultSet(response.getNumRowsResultSet()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 7901772362..ab013085cb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -366,8 +366,8 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ // Compile the request into PinotQuery long compilationStartTimeNs = System.nanoTime(); CompileResult compileResult = - compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders, - accessControl); + compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders, + accessControl); if (compileResult._errorOrLiteralOnlyBrokerResponse != null) { /* @@ -406,8 +406,17 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ } // Validate QPS - if (hasExceededQPSQuota(database, physicalTableNames, requestContext)) { - String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query); + if (!_queryQuotaManager.acquireDatabase(database)) { + String errorMessage = + String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database); + LOGGER.info(errorMessage); + requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); + return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); + } + if (!_queryQuotaManager.acquireLogicalTable(tableName)) { + String errorMessage = + String.format("Request %d: %s exceeds query quota for table: %s.", requestId, query, tableName); + requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); } @@ -815,9 +824,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) { return new CompileResult(new BrokerResponseNative(QueryErrorCode.SQL_PARSING, "It seems that the query is only supported by the multi-stage query engine, please retry the query " - + "using " - + "the multi-stage query engine " - + "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)")); + + "using " + + "the multi-stage query engine " + + "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)")); } else { return new CompileResult( new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage())); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 029f5c3491..eff90fcf49 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -884,7 +884,8 @@ public class ZKMetadataProvider { return propertyStore.exists(constructPropertyStorePathForLogical(tableName), AccessOption.PERSISTENT); } - public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) { - return propertyStore.exists(constructPropertyStorePathForResourceConfig(tableName), AccessOption.PERSISTENT); + public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType) { + return propertyStore.exists(constructPropertyStorePathForResourceConfig(tableNameWithType), + AccessOption.PERSISTENT); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java index e619cbf008..7969d2aa86 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java @@ -38,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.tenant.TenantRole; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; @@ -137,6 +138,17 @@ public class ControllerRequestClient { } } + public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws IOException { + try { + HttpClient.wrapAndThrowHttpException( + _httpClient.sendJsonPostRequest(new URI(_controllerRequestURLBuilder.forLogicalTableCreate()), + logicalTableConfig.toJsonString(), _headers)); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + public void updateTableConfig(TableConfig tableConfig) throws IOException { try { @@ -148,6 +160,17 @@ public class ControllerRequestClient { } } + public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws IOException { + try { + HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPutRequest( + new URI(_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableConfig.getTableName())), + logicalTableConfig.toJsonString(), _headers)); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + public void toggleTableState(String tableName, TableType type, boolean enable) throws IOException { try { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 892e3fbc11..09bd904aee 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -180,7 +180,6 @@ import org.apache.pinot.spi.config.user.UserConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; @@ -2152,13 +2151,7 @@ public class PinotHelixResourceManager { updateBrokerResourceForLogicalTable(logicalTableConfig, tableName); } - TimeBoundaryConfig oldTimeBoundaryConfig = oldLogicalTableConfig.getTimeBoundaryConfig(); - TimeBoundaryConfig newTimeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); - // compare the old and new time boundary config and send message if they are different - if ((oldTimeBoundaryConfig != null && !oldTimeBoundaryConfig.equals(newTimeBoundaryConfig)) - || (oldTimeBoundaryConfig == null && newTimeBoundaryConfig != null)) { - sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName()); - } + sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName()); LOGGER.info("Updated logical table {}: Successfully updated table", tableName); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index cfd0f3f6cc..cb04aed946 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -407,7 +407,7 @@ public class ControllerTest { .setBrokerTenant(brokerTenant) .setRefOfflineTableName(offlineTableName) .setRefRealtimeTableName(realtimeTableName) - .setQuotaConfig(new QuotaConfig(null, "999")) + .setQuotaConfig(new QuotaConfig(null, "99999")) .setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L)) .setTimeBoundaryConfig(new TimeBoundaryConfig("min", Map.of("includedTables", physicalTableNames))) .setPhysicalTableConfigMap(physicalTableConfigMap); @@ -752,11 +752,21 @@ public class ControllerTest { getControllerRequestClient().addTableConfig(tableConfig); } + public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws IOException { + getControllerRequestClient().addLogicalTableConfig(logicalTableConfig); + } + public void updateTableConfig(TableConfig tableConfig) throws IOException { getControllerRequestClient().updateTableConfig(tableConfig); } + public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws IOException { + getControllerRequestClient().updateLogicalTableConfig(logicalTableConfig); + } + public void toggleTableState(String tableName, TableType type, boolean enable) throws IOException { getControllerRequestClient().toggleTableState(tableName, type, enable); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java index f3945bcf1f..f91a4a9148 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.util.Iterator; +import java.util.List; import java.util.Properties; import org.apache.pinot.broker.broker.helix.BaseBrokerStarter; import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest; @@ -35,10 +36,12 @@ import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -76,6 +79,12 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest TableConfig tableConfig = createOfflineTableConfig(); addTableConfig(tableConfig); + // Create and upload schema and logical table + schema.setSchemaName(getLogicalTableName()); + addSchema(schema); + LogicalTableConfig logicalTableConfig = getLogicalTableConfig(); + addLogicalTableConfig(logicalTableConfig); + Properties properties = new Properties(); properties.put(FAIL_ON_EXCEPTIONS, "FALSE"); _pinotClientTransport = new JsonAsyncHttpPinotClientTransportFactory() @@ -96,9 +105,11 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest setQueryQuotaForApplication(null); addQueryQuotaToDatabaseConfig(null); addQueryQuotaToTableConfig(null); + addQueryQuotaToLogicalTableConfig(null); _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0); verifyQuotaUpdate(0); + verifyQuotaUpdateWithTableName(0, getLogicalTableName()); } @Test @@ -258,6 +269,48 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest } } + @Test + public void testLogicalTableQueryQuota() + throws Exception { + int maxQps = 10; + addQueryQuotaToLogicalTableConfig(maxQps); + verifyQuotaUpdateWithTableName(maxQps, getLogicalTableName()); + runQueries(maxQps, false, "default", getLogicalTableName()); + //increase the qps and some of the queries should be throttled. + runQueries(maxQps * 2, true, "default", getLogicalTableName()); + + // queries on broker + runQueriesOnBroker(maxQps, false, getLogicalTableName()); + //increase the qps and some of the queries should be throttled. + runQueriesOnBroker(maxQps * 2, true, getLogicalTableName()); + } + + @Test + public void testLogicalTableWithDatabaseQueryQuota() + throws Exception { + int databaseMaxQps = 25; + int logicalTableMaxQps = 10; + addQueryQuotaToDatabaseConfig(databaseMaxQps); + addQueryQuotaToLogicalTableConfig(logicalTableMaxQps); + // table quota within database quota. Queries should fail upon table quota (10 qps) breach + verifyQuotaUpdateWithTableName(logicalTableMaxQps, getLogicalTableName()); + runQueries(logicalTableMaxQps, false, "default", getLogicalTableName()); + // queries on broker + runQueriesOnBroker(logicalTableMaxQps, false, getLogicalTableName()); + + //increase the logical table qps. + logicalTableMaxQps = 50; + addQueryQuotaToLogicalTableConfig(logicalTableMaxQps); + verifyQuotaUpdateWithTableName(databaseMaxQps, getLogicalTableName()); + runQueries(databaseMaxQps, false, "default", getLogicalTableName()); + // broker queries + runQueriesOnBroker(databaseMaxQps, false, getLogicalTableName()); + + //increase the qps and some of the queries should be throttled. + runQueries(databaseMaxQps * 2, true, "default", getLogicalTableName()); + runQueriesOnBroker(databaseMaxQps * 2, true, getLogicalTableName()); + } + /** * Runs the query load with the max rate that the quota can allow and ensures queries are not failing. * Then runs the query load with double the max rate and expects queries to fail due to quota breach. @@ -295,15 +348,18 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest private void runQueries(int qps, boolean shouldFail) { runQueries(qps, shouldFail, "default"); } + private void runQueries(int qps, boolean shouldFail, String applicationName) { + runQueries(qps, shouldFail, applicationName, getTableName()); + } // try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis // is not comparable to sleepMillis, else the actual qps would end up being much lower than required qps - private void runQueries(int qps, boolean shouldFail, String applicationName) { + private void runQueries(int qps, boolean shouldFail, String applicationName, String tableName) { int failCount = 0; boolean isLastFail = false; long deadline = System.currentTimeMillis() + 1000; - String query = "SET applicationName='" + applicationName + "'; SELECT COUNT(*) FROM " + getTableName(); + String query = "SET applicationName='" + applicationName + "'; SELECT COUNT(*) FROM " + tableName; for (int i = 0; i < qps; i++) { sleep(deadline, qps - i); @@ -333,11 +389,15 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest private static volatile String _quotaSource; private void verifyQuotaUpdate(double quotaQps) { + verifyQuotaUpdateWithTableName(quotaQps, getTableName() + "_OFFLINE"); + } + + private void verifyQuotaUpdateWithTableName(double quotaQps, String tableName) { try { TestUtils.waitForCondition(aVoid -> { try { double tableQuota = Double.parseDouble(sendGetRequest( - "http://" + _brokerHostPort + "/debug/tables/queryQuota/" + getTableName() + "_OFFLINE")); + "http://" + _brokerHostPort + "/debug/tables/queryQuota/" + tableName)); double dbQuota = Double.parseDouble( sendGetRequest("http://" + _brokerHostPort + "/debug/databases/queryQuota/default")); double appQuota = Double.parseDouble( @@ -363,7 +423,7 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest } catch (IOException e) { throw new RuntimeException(e); } - }, 10000, "Failed to reflect query quota on rate limiter in 5s."); + }, 10000, "Failed to reflect query quota on rate limiter in 10s."); } catch (AssertionError ae) { throw new AssertionError( ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae); @@ -375,13 +435,17 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest } private void runQueriesOnBroker(float qps, boolean shouldFail) { + runQueriesOnBroker(qps, shouldFail, getTableName()); + } + + private void runQueriesOnBroker(float qps, boolean shouldFail, String tableName) { int failCount = 0; long deadline = System.currentTimeMillis() + 1000; for (int i = 0; i < qps; i++) { sleep(deadline, qps - i); BrokerResponse resultSetGroup = - executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName()); + executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) FROM " + tableName); for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements(); it.hasNext(); ) { JsonNode exception = it.next(); if (exception.get("errorCode").asInt() == QueryErrorCode.TOO_MANY_REQUESTS.getId()) { @@ -406,6 +470,14 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest // to allow change propagation to QueryQuotaManager } + public void addQueryQuotaToLogicalTableConfig(Integer maxQps) + throws Exception { + LogicalTableConfig logicalTableConfig = getLogicalTableConfig(); + logicalTableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? null : maxQps.toString())); + updateLogicalTableConfig(logicalTableConfig); + // to allow change propagation to QueryQuotaManager + } + public void addQueryQuotaToDatabaseConfig(Integer maxQps) throws Exception { String url = _controllerRequestURLBuilder.getBaseUrl() + "/databases/default/quotas"; @@ -453,4 +525,16 @@ public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest } // to allow change propagation to QueryQuotaManager } + + private static String getLogicalTableName() { + return "logical_table"; + } + + private LogicalTableConfig getLogicalTableConfig() { + List<String> physicalTableNames = List.of(TableNameBuilder.OFFLINE.tableNameWithType(getTableName())); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(getLogicalTableName(), physicalTableNames, "DefaultTenant"); + logicalTableConfig.setQuotaConfig(new QuotaConfig(null, null)); + return logicalTableConfig; + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java index 29c6f05516..7fb4802cc1 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java @@ -324,16 +324,6 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra return LogicalTableConfig.fromString(resp); } - protected void updateLogicalTableConfig(String logicalTableName, LogicalTableConfig logicalTableConfig) - throws IOException { - String updateLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName); - String resp = - ControllerTest.sendPutRequest(updateLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - - assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + getLogicalTableName() - + " logical table successfully updated.\"}"); - } - protected void deleteLogicalTable() throws IOException { String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName()); @@ -448,7 +438,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra QueryConfig queryConfig = new QueryConfig(null, false, null, null, null, null); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); String groovyQuery = "SELECT GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', " + "'arg0 + arg1', FlightNum, Origin) FROM mytable"; @@ -460,7 +450,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra queryConfig = new QueryConfig(null, true, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); // grpc and http throw different exceptions. So only check error message. Exception athrows = expectThrows(Exception.class, () -> postQuery(groovyQuery)); @@ -468,7 +458,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra // Remove query config logicalTableConfig.setQueryConfig(null); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); athrows = expectThrows(Exception.class, () -> postQuery(groovyQuery)); assertTrue(athrows.getMessage().contains("Groovy transform functions are disabled for queries")); @@ -482,7 +472,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra QueryConfig queryConfig = new QueryConfig(null, null, null, null, 100L, null); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); JsonNode response = postQuery(starQuery); JsonNode exceptions = response.get("exceptions"); @@ -492,7 +482,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra // Query Succeeds with a high limit. queryConfig = new QueryConfig(null, null, null, null, 1000000L, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -500,7 +490,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra //Reset to null. queryConfig = new QueryConfig(null, null, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -514,7 +504,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, 1000L); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); JsonNode response = postQuery(starQuery); JsonNode exceptions = response.get("exceptions"); assertTrue(!exceptions.isEmpty() @@ -523,7 +513,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra // Query Succeeds with a high limit. queryConfig = new QueryConfig(null, null, null, null, null, 1000000L); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -531,7 +521,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra //Reset to null. queryConfig = new QueryConfig(null, null, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -544,7 +534,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra QueryConfig queryConfig = new QueryConfig(1L, null, null, null, null, null); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); JsonNode response = postQuery(starQuery); JsonNode exceptions = response.get("exceptions"); assertTrue( @@ -555,7 +545,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra // Query Succeeds with a high limit. queryConfig = new QueryConfig(1000000L, null, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -563,7 +553,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra //Reset to null. queryConfig = new QueryConfig(null, null, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java index ba594a303c..cafa33a56d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java @@ -76,6 +76,6 @@ public class LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters); logicalTableConfig.setQueryConfig(null); - updateLogicalTableConfig(logicalTableConfig.getTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org