Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2573627906
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,254 @@ private void throwAccessDeniedError(long requestId,
String query, RequestContext
throw new WebApplicationException("Permission denied." + failureMessage,
Response.Status.FORBIDDEN);
}
+ private boolean isSystemTable(String tableName) {
+ return tableName != null &&
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+ }
+
+ private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String
tableName,
+ RequestContext requestContext, @Nullable RequesterIdentity
requesterIdentity, String query) {
+ if (pinotQuery.isExplain()) {
+ return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+ }
+ SystemTableProvider provider = _systemTableRegistry.get(tableName);
+ if (provider == null) {
+ requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+ return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+ }
+ try {
+ if (!isSupportedSystemTableQuery(pinotQuery)) {
+ requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+ "System tables only support simple projection/filter/limit
queries");
+ }
+ Schema systemSchema = provider.getSchema();
+ List<String> projectionColumns = extractProjectionColumns(pinotQuery,
systemSchema);
+ int offset = Math.max(0, pinotQuery.getOffset());
+ int limit = Math.max(0, pinotQuery.getLimit());
+ SystemTableRequest systemTableRequest =
+ new SystemTableRequest(projectionColumns,
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+ limit);
+ SystemTableResponse systemTableResponse =
provider.getRows(systemTableRequest);
+ BrokerResponseNative brokerResponse =
+ buildSystemTableBrokerResponse(tableName, systemSchema,
projectionColumns, systemTableResponse, offset,
+ limit, requestContext);
+ brokerResponse.setTimeUsedMs(System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis());
+ _queryLogger.log(new QueryLogger.QueryLogParams(requestContext,
tableName, brokerResponse,
+ QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE,
requesterIdentity, null));
+ return brokerResponse;
+ } catch (BadQueryRequestException e) {
+ requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS,
1);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
e.getMessage());
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while handling system table query {}: {}",
tableName, e.getMessage(), e);
+ requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION,
e.getMessage());
+ }
+ }
+
+ private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+ return (pinotQuery.getGroupByList() == null ||
pinotQuery.getGroupByList().isEmpty())
+ && pinotQuery.getHavingExpression() == null
+ && (pinotQuery.getOrderByList() == null ||
pinotQuery.getOrderByList().isEmpty());
+ }
+
+ private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema
schema)
+ throws BadQueryRequestException {
+ List<String> projections = new ArrayList<>();
+ boolean hasStar = false;
+ List<Expression> selectList = pinotQuery.getSelectList();
+ if (CollectionUtils.isEmpty(selectList)) {
+ throw new BadQueryRequestException("System tables require a projection
list");
+ }
+ for (Expression expression : selectList) {
+ Identifier identifier = expression.getIdentifier();
+ if (identifier != null) {
+ if ("*".equals(identifier.getName())) {
+ hasStar = true;
+ } else {
+ projections.add(identifier.getName());
+ }
+ continue;
+ }
+ Function function = expression.getFunctionCall();
+ if (function != null && "AS".equalsIgnoreCase(function.getOperator()) &&
!function.getOperands().isEmpty()) {
+ Identifier aliased = function.getOperands().get(0).getIdentifier();
+ if (aliased != null) {
+ projections.add(aliased.getName());
+ continue;
+ }
+ }
+ throw new BadQueryRequestException("System tables only support column
projections or '*'");
+ }
+ if (hasStar || projections.isEmpty()) {
+ projections = new ArrayList<>(schema.getColumnNames());
+ }
+ List<String> normalized = new ArrayList<>(projections.size());
+ for (String column : projections) {
+ if (schema.hasColumn(column)) {
+ normalized.add(column);
+ continue;
+ }
+ String matched = null;
+ for (String schemaColumn : schema.getColumnNames()) {
+ if (schemaColumn.equalsIgnoreCase(column)) {
+ matched = schemaColumn;
+ break;
+ }
+ }
+ if (matched == null) {
+ throw new BadQueryRequestException("Unknown column in system table: "
+ column);
+ }
+ normalized.add(matched);
+ }
+ return normalized;
+ }
+
+ private BrokerResponseNative buildSystemTableBrokerResponse(String
tableName, Schema schema,
+ List<String> projectionColumns, SystemTableResponse response, int
offset, int limit,
+ RequestContext requestContext) {
+ DataSchema dataSchema = buildSystemTableDataSchema(schema,
projectionColumns);
+ List<GenericRow> rows = response != null ? response.getRows() :
Collections.emptyList();
+ List<Object[]> resultRows = new ArrayList<>();
+ if (rows != null && limit != 0) {
+ int skipped = 0;
+ for (GenericRow row : rows) {
+ if (skipped++ < offset) {
+ continue;
+ }
+ if (limit > 0 && resultRows.size() >= limit) {
+ break;
+ }
Review Comment:
The offset is being applied in-memory after fetching all rows from the
provider. Since SystemTableRequest includes offset, providers should handle
offset to avoid fetching and iterating over rows that will be skipped.
```suggestion
List<String> projectionColumns, SystemTableResponse response,
RequestContext requestContext) {
DataSchema dataSchema = buildSystemTableDataSchema(schema,
projectionColumns);
List<GenericRow> rows = response != null ? response.getRows() :
Collections.emptyList();
List<Object[]> resultRows = new ArrayList<>();
if (rows != null) {
for (GenericRow row : rows) {
```
##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+ private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName("system.tables")
+ .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+ .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+ .addMetric("reportedSize", FieldSpec.DataType.LONG)
+ .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+ .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+ .build();
+
+ private final TableCache _tableCache;
+ private final @Nullable HelixAdmin _helixAdmin;
+ private final @Nullable String _clusterName;
+ private final HttpClient _httpClient;
+ private final @Nullable Function<String, TableSize>
_tableSizeFetcherOverride;
+ private final List<String> _staticControllerUrls;
+
+ public TablesSystemTableProvider() {
+ this(null, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache) {
+ this(tableCache, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin) {
+ this(tableCache, helixAdmin, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin,
+ @Nullable String clusterName) {
+ this(tableCache, helixAdmin, clusterName, null, null);
+ }
+
+ TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin, @Nullable String clusterName,
+ @Nullable Function<String, TableSize> tableSizeFetcherOverride,
@Nullable List<String> controllerUrls) {
+ _tableCache = tableCache;
+ _helixAdmin = helixAdmin;
+ _clusterName = clusterName;
+ _httpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+ _tableSizeFetcherOverride = tableSizeFetcherOverride;
+ _staticControllerUrls = controllerUrls != null ? new
ArrayList<>(controllerUrls) : List.of();
+ }
+
+ @Override
+ public String getTableName() {
+ return "system.tables";
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+
+ @Override
+ public SystemTableResponse getRows(SystemTableRequest request) {
+ if (_tableCache == null) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ Set<String> tableNamesWithType = new LinkedHashSet<>();
+ for (String tableName : _tableCache.getTableNameMap().values()) {
+ if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+ tableNamesWithType.add(tableName);
+ }
+ }
+ List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+ sortedTableNames.sort(Comparator.naturalOrder());
+
+ List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+ for (String tableNameWithType : sortedTableNames) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ continue;
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableStats stats = buildStats(tableNameWithType, tableType);
+ if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+ continue;
+ }
+ GenericRow row = new GenericRow();
+ row.putValue("tableName", rawTableName);
+ row.putValue("type", stats._type);
+ row.putValue("status", stats._status);
+ row.putValue("segments", stats._segments);
+ row.putValue("totalDocs", stats._totalDocs);
+ row.putValue("reportedSize", stats._reportedSizeInBytes);
+ row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+ row.putValue("storageTier", stats._storageTier);
+ row.putValue("brokerTenant", stats._brokerTenant);
+ row.putValue("serverTenant", stats._serverTenant);
+ row.putValue("replicas", stats._replicas);
+ row.putValue("tableConfig", stats._tableConfig);
+ rows.add(row);
+ }
+ return new SystemTableResponse(rows, System.currentTimeMillis(),
rows.size());
+ }
+
+ private TableStats buildStats(String tableNameWithType, TableType tableType)
{
+ TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+ int segments = 0;
+ long totalDocs = 0;
+ long reportedSize = 0;
+ long estimatedSize = 0;
+ String tierValue = "";
+ String brokerTenant = "";
+ String serverTenant = "";
+ int replicas = 0;
+ if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+ brokerTenant = tableConfig.getTenantConfig().getBroker();
+ serverTenant = tableConfig.getTenantConfig().getServer();
+ }
+ if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+ Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+ replicas = repl != null ? repl : replicas;
+ }
+ // Use controller API only
+ TableSize sizeFromController = fetchTableSize(tableNameWithType);
+ if (sizeFromController != null) {
+ if (sizeFromController._reportedSizeInBytes >= 0) {
+ reportedSize = sizeFromController._reportedSizeInBytes;
+ }
+ if (sizeFromController._estimatedSizeInBytes >= 0) {
+ estimatedSize = sizeFromController._estimatedSizeInBytes;
+ }
+ segments = getSegmentCount(sizeFromController, tableType);
+ totalDocs = getTotalDocs(sizeFromController, tableType);
+ }
+ String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE"
: "UNKNOWN");
+ String tableConfigJson = "";
+ if (tableConfig != null) {
+ try {
+ tableConfigJson = JsonUtils.objectToString(tableConfig);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to serialize table config for {}: {}",
tableNameWithType, e.toString());
+ tableConfigJson = tableConfig.toString();
+ }
+ }
+ return new TableStats(tableType.name(), status, segments, totalDocs,
reportedSize, estimatedSize, tierValue,
+ tableConfigJson, brokerTenant, serverTenant, replicas);
+ }
+
+ private boolean matchesFilter(SystemTableFilter filter, TableStats stats,
String rawTableName) {
+ if (filter == null) {
+ return true;
+ }
+ if (filter.getChildren().isEmpty()) {
+ return matchesLeafFilter(filter, stats, rawTableName);
+ }
+ switch (filter.getOperator()) {
+ case AND:
+ for (SystemTableFilter child : filter.getChildren()) {
+ if (!matchesFilter(child, stats, rawTableName)) {
+ return false;
+ }
+ }
+ return true;
+ case OR:
+ for (SystemTableFilter child : filter.getChildren()) {
+ if (matchesFilter(child, stats, rawTableName)) {
+ return true;
+ }
+ }
+ return false;
+ case NOT:
+ return filter.getChildren().isEmpty() ||
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesLeafFilter(SystemTableFilter filter, TableStats
stats, String rawTableName) {
+ String column = filter.getColumn();
+ if (column == null) {
+ return true;
+ }
+ List<String> values = filter.getValues();
+ if (values == null || values.isEmpty()) {
+ return true;
+ }
+ switch (column.toLowerCase()) {
+ case "tablename":
+ return matchesString(values, rawTableName, filter.getOperator());
+ case "type":
+ return matchesString(values, stats._type, filter.getOperator());
+ case "status":
+ return matchesString(values, stats._status, filter.getOperator());
+ case "segments":
+ return matchesNumber(values, stats._segments, filter.getOperator());
+ case "reportedsize":
+ return matchesNumber(values, stats._reportedSizeInBytes,
filter.getOperator());
+ case "estimatedsize":
+ return matchesNumber(values, stats._estimatedSizeInBytes,
filter.getOperator());
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesString(List<String> candidates, String actual,
SystemTableFilter.Operator operator) {
+ switch (operator) {
+ case EQ:
+ return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+ case IN:
+ return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+ case NEQ:
+ return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesNumber(List<String> candidates, long actual,
SystemTableFilter.Operator operator) {
+ try {
+ switch (operator) {
+ case EQ:
+ return candidates.stream().anyMatch(v -> Long.parseLong(v) ==
actual);
+ case NEQ:
+ return candidates.stream().noneMatch(v -> Long.parseLong(v) ==
actual);
+ case GT:
+ return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+ case GTE:
+ return candidates.stream().anyMatch(v -> actual >=
Long.parseLong(v));
+ case LT:
+ return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+ case LTE:
+ return candidates.stream().anyMatch(v -> actual <=
Long.parseLong(v));
+ case IN:
+ for (String candidate : candidates) {
+ if (actual == Long.parseLong(candidate)) {
+ return true;
+ }
+ }
+ return false;
+ default:
+ return true;
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates,
e.toString());
+ return true;
+ }
+ }
+
+ private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+ Function<String, TableSize> fetcher = getSizeFetcher();
+ if (fetcher != null) {
+ try {
+ TableSize fetched = fetcher.apply(tableNameWithType);
+ if (fetched != null) {
+ return fetched;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType,
e.toString());
+ }
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableSize size = fetchTableSizeForName(rawTableName);
+ if (size == null) {
+ size = fetchTableSizeForName(tableNameWithType);
+ if (size == null) {
+ LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}),
falling back to zeros",
+ tableNameWithType, rawTableName);
+ }
+ }
+ return size;
+ }
+
+ private @Nullable TableSize fetchTableSizeForName(String tableName) {
+ for (String baseUrl : getControllerBaseUrls()) {
+ try {
+ String url = baseUrl + "/tables/" + tableName +
"/size?verbose=true&includeReplacedSegments=false";
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .timeout(Duration.ofSeconds(5))
+ .GET()
+ .header("Accept", "application/json")
+ .build();
+ HttpResponse<String> response = _httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
+ if (response.statusCode() >= 200 && response.statusCode() < 300) {
+ TableSize parsed = JsonUtils.stringToObject(response.body(),
TableSize.class);
+ LOGGER.info("system.tables: controller size response for {} via {}
-> segments offline={}, realtime={}, "
+ + "reportedSize={}, estimatedSize={}", tableName, baseUrl,
+ parsed._offlineSegments != null &&
parsed._offlineSegments._segments != null
+ ? parsed._offlineSegments._segments.size() : 0,
+ parsed._realtimeSegments != null &&
parsed._realtimeSegments._segments != null
+ ? parsed._realtimeSegments._segments.size() : 0,
+ parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+ return parsed;
+ } else {
+ LOGGER.warn("system.tables: failed to fetch table size for {} via
{}: status {}, body={}", tableName,
+ baseUrl, response.statusCode(), response.body());
+ }
+ } catch (Exception e) {
+ LOGGER.warn("system.tables: error fetching table size for {} via {}:
{}", tableName, baseUrl, e.toString(), e);
+ System.out.println("system.tables fetch error for " + tableName + "
via " + baseUrl + ": " + e);
Review Comment:
Debug output should use SLF4J logging instead of System.out. A warning is
already logged on line 355, making this print redundant and inconsistent with
the project's logging standards.
```suggestion
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,254 @@ private void throwAccessDeniedError(long requestId,
String query, RequestContext
throw new WebApplicationException("Permission denied." + failureMessage,
Response.Status.FORBIDDEN);
}
+ private boolean isSystemTable(String tableName) {
+ return tableName != null &&
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+ }
+
+ private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String
tableName,
+ RequestContext requestContext, @Nullable RequesterIdentity
requesterIdentity, String query) {
+ if (pinotQuery.isExplain()) {
+ return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+ }
+ SystemTableProvider provider = _systemTableRegistry.get(tableName);
+ if (provider == null) {
+ requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+ return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+ }
+ try {
+ if (!isSupportedSystemTableQuery(pinotQuery)) {
+ requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+ "System tables only support simple projection/filter/limit
queries");
+ }
+ Schema systemSchema = provider.getSchema();
+ List<String> projectionColumns = extractProjectionColumns(pinotQuery,
systemSchema);
+ int offset = Math.max(0, pinotQuery.getOffset());
+ int limit = Math.max(0, pinotQuery.getLimit());
+ SystemTableRequest systemTableRequest =
+ new SystemTableRequest(projectionColumns,
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+ limit);
+ SystemTableResponse systemTableResponse =
provider.getRows(systemTableRequest);
+ BrokerResponseNative brokerResponse =
+ buildSystemTableBrokerResponse(tableName, systemSchema,
projectionColumns, systemTableResponse, offset,
+ limit, requestContext);
+ brokerResponse.setTimeUsedMs(System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis());
+ _queryLogger.log(new QueryLogger.QueryLogParams(requestContext,
tableName, brokerResponse,
+ QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE,
requesterIdentity, null));
+ return brokerResponse;
+ } catch (BadQueryRequestException e) {
+ requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS,
1);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
e.getMessage());
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while handling system table query {}: {}",
tableName, e.getMessage(), e);
+ requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+ return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION,
e.getMessage());
+ }
+ }
+
+ private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+ return (pinotQuery.getGroupByList() == null ||
pinotQuery.getGroupByList().isEmpty())
+ && pinotQuery.getHavingExpression() == null
+ && (pinotQuery.getOrderByList() == null ||
pinotQuery.getOrderByList().isEmpty());
+ }
+
+ private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema
schema)
+ throws BadQueryRequestException {
+ List<String> projections = new ArrayList<>();
+ boolean hasStar = false;
+ List<Expression> selectList = pinotQuery.getSelectList();
+ if (CollectionUtils.isEmpty(selectList)) {
+ throw new BadQueryRequestException("System tables require a projection
list");
+ }
+ for (Expression expression : selectList) {
+ Identifier identifier = expression.getIdentifier();
+ if (identifier != null) {
+ if ("*".equals(identifier.getName())) {
+ hasStar = true;
+ } else {
+ projections.add(identifier.getName());
+ }
+ continue;
+ }
+ Function function = expression.getFunctionCall();
+ if (function != null && "AS".equalsIgnoreCase(function.getOperator()) &&
!function.getOperands().isEmpty()) {
+ Identifier aliased = function.getOperands().get(0).getIdentifier();
+ if (aliased != null) {
+ projections.add(aliased.getName());
+ continue;
+ }
+ }
+ throw new BadQueryRequestException("System tables only support column
projections or '*'");
+ }
+ if (hasStar || projections.isEmpty()) {
+ projections = new ArrayList<>(schema.getColumnNames());
+ }
+ List<String> normalized = new ArrayList<>(projections.size());
+ for (String column : projections) {
+ if (schema.hasColumn(column)) {
+ normalized.add(column);
+ continue;
+ }
+ String matched = null;
+ for (String schemaColumn : schema.getColumnNames()) {
+ if (schemaColumn.equalsIgnoreCase(column)) {
+ matched = schemaColumn;
+ break;
+ }
+ }
+ if (matched == null) {
+ throw new BadQueryRequestException("Unknown column in system table: "
+ column);
+ }
+ normalized.add(matched);
+ }
+ return normalized;
+ }
+
+ private BrokerResponseNative buildSystemTableBrokerResponse(String
tableName, Schema schema,
+ List<String> projectionColumns, SystemTableResponse response, int
offset, int limit,
+ RequestContext requestContext) {
+ DataSchema dataSchema = buildSystemTableDataSchema(schema,
projectionColumns);
+ List<GenericRow> rows = response != null ? response.getRows() :
Collections.emptyList();
+ List<Object[]> resultRows = new ArrayList<>();
+ if (rows != null && limit != 0) {
+ int skipped = 0;
+ for (GenericRow row : rows) {
+ if (skipped++ < offset) {
+ continue;
+ }
+ if (limit > 0 && resultRows.size() >= limit) {
+ break;
+ }
Review Comment:
The limit is being applied in-memory after fetching all rows from the
provider. Since SystemTableRequest includes limit, providers should handle
limit to avoid fetching unnecessary rows.
```suggestion
if (rows != null) {
for (GenericRow row : rows) {
```
##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+
+ private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName("system.tables")
+ .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+ .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+ .addMetric("reportedSize", FieldSpec.DataType.LONG)
+ .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+ .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+ .build();
+
+ private final TableCache _tableCache;
+ private final @Nullable HelixAdmin _helixAdmin;
+ private final @Nullable String _clusterName;
+ private final HttpClient _httpClient;
+ private final @Nullable Function<String, TableSize>
_tableSizeFetcherOverride;
+ private final List<String> _staticControllerUrls;
+
+ public TablesSystemTableProvider() {
+ this(null, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache) {
+ this(tableCache, null, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin) {
+ this(tableCache, helixAdmin, null, null, null);
+ }
+
+ public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin,
+ @Nullable String clusterName) {
+ this(tableCache, helixAdmin, clusterName, null, null);
+ }
+
+ TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin
helixAdmin, @Nullable String clusterName,
+ @Nullable Function<String, TableSize> tableSizeFetcherOverride,
@Nullable List<String> controllerUrls) {
+ _tableCache = tableCache;
+ _helixAdmin = helixAdmin;
+ _clusterName = clusterName;
+ _httpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+ _tableSizeFetcherOverride = tableSizeFetcherOverride;
+ _staticControllerUrls = controllerUrls != null ? new
ArrayList<>(controllerUrls) : List.of();
+ }
+
+ @Override
+ public String getTableName() {
+ return "system.tables";
+ }
+
+ @Override
+ public Schema getSchema() {
+ return SCHEMA;
+ }
+
+ @Override
+ public SystemTableResponse getRows(SystemTableRequest request) {
+ if (_tableCache == null) {
+ return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+ }
+ Set<String> tableNamesWithType = new LinkedHashSet<>();
+ for (String tableName : _tableCache.getTableNameMap().values()) {
+ if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+ tableNamesWithType.add(tableName);
+ }
+ }
+ List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+ sortedTableNames.sort(Comparator.naturalOrder());
+
+ List<GenericRow> rows = new ArrayList<>(sortedTableNames.size());
+ for (String tableNameWithType : sortedTableNames) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ continue;
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableStats stats = buildStats(tableNameWithType, tableType);
+ if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+ continue;
+ }
+ GenericRow row = new GenericRow();
+ row.putValue("tableName", rawTableName);
+ row.putValue("type", stats._type);
+ row.putValue("status", stats._status);
+ row.putValue("segments", stats._segments);
+ row.putValue("totalDocs", stats._totalDocs);
+ row.putValue("reportedSize", stats._reportedSizeInBytes);
+ row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+ row.putValue("storageTier", stats._storageTier);
+ row.putValue("brokerTenant", stats._brokerTenant);
+ row.putValue("serverTenant", stats._serverTenant);
+ row.putValue("replicas", stats._replicas);
+ row.putValue("tableConfig", stats._tableConfig);
+ rows.add(row);
+ }
+ return new SystemTableResponse(rows, System.currentTimeMillis(),
rows.size());
+ }
+
+ private TableStats buildStats(String tableNameWithType, TableType tableType)
{
+ TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+ int segments = 0;
+ long totalDocs = 0;
+ long reportedSize = 0;
+ long estimatedSize = 0;
+ String tierValue = "";
+ String brokerTenant = "";
+ String serverTenant = "";
+ int replicas = 0;
+ if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+ brokerTenant = tableConfig.getTenantConfig().getBroker();
+ serverTenant = tableConfig.getTenantConfig().getServer();
+ }
+ if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+ Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+ replicas = repl != null ? repl : replicas;
+ }
+ // Use controller API only
+ TableSize sizeFromController = fetchTableSize(tableNameWithType);
+ if (sizeFromController != null) {
+ if (sizeFromController._reportedSizeInBytes >= 0) {
+ reportedSize = sizeFromController._reportedSizeInBytes;
+ }
+ if (sizeFromController._estimatedSizeInBytes >= 0) {
+ estimatedSize = sizeFromController._estimatedSizeInBytes;
+ }
+ segments = getSegmentCount(sizeFromController, tableType);
+ totalDocs = getTotalDocs(sizeFromController, tableType);
+ }
+ String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE"
: "UNKNOWN");
+ String tableConfigJson = "";
+ if (tableConfig != null) {
+ try {
+ tableConfigJson = JsonUtils.objectToString(tableConfig);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to serialize table config for {}: {}",
tableNameWithType, e.toString());
+ tableConfigJson = tableConfig.toString();
+ }
+ }
+ return new TableStats(tableType.name(), status, segments, totalDocs,
reportedSize, estimatedSize, tierValue,
+ tableConfigJson, brokerTenant, serverTenant, replicas);
+ }
+
+ private boolean matchesFilter(SystemTableFilter filter, TableStats stats,
String rawTableName) {
+ if (filter == null) {
+ return true;
+ }
+ if (filter.getChildren().isEmpty()) {
+ return matchesLeafFilter(filter, stats, rawTableName);
+ }
+ switch (filter.getOperator()) {
+ case AND:
+ for (SystemTableFilter child : filter.getChildren()) {
+ if (!matchesFilter(child, stats, rawTableName)) {
+ return false;
+ }
+ }
+ return true;
+ case OR:
+ for (SystemTableFilter child : filter.getChildren()) {
+ if (matchesFilter(child, stats, rawTableName)) {
+ return true;
+ }
+ }
+ return false;
+ case NOT:
+ return filter.getChildren().isEmpty() ||
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesLeafFilter(SystemTableFilter filter, TableStats
stats, String rawTableName) {
+ String column = filter.getColumn();
+ if (column == null) {
+ return true;
+ }
+ List<String> values = filter.getValues();
+ if (values == null || values.isEmpty()) {
+ return true;
+ }
+ switch (column.toLowerCase()) {
+ case "tablename":
+ return matchesString(values, rawTableName, filter.getOperator());
+ case "type":
+ return matchesString(values, stats._type, filter.getOperator());
+ case "status":
+ return matchesString(values, stats._status, filter.getOperator());
+ case "segments":
+ return matchesNumber(values, stats._segments, filter.getOperator());
+ case "reportedsize":
+ return matchesNumber(values, stats._reportedSizeInBytes,
filter.getOperator());
+ case "estimatedsize":
+ return matchesNumber(values, stats._estimatedSizeInBytes,
filter.getOperator());
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesString(List<String> candidates, String actual,
SystemTableFilter.Operator operator) {
+ switch (operator) {
+ case EQ:
+ return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+ case IN:
+ return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+ case NEQ:
+ return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+ default:
+ return true;
+ }
+ }
+
+ private boolean matchesNumber(List<String> candidates, long actual,
SystemTableFilter.Operator operator) {
+ try {
+ switch (operator) {
+ case EQ:
+ return candidates.stream().anyMatch(v -> Long.parseLong(v) ==
actual);
+ case NEQ:
+ return candidates.stream().noneMatch(v -> Long.parseLong(v) ==
actual);
+ case GT:
+ return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+ case GTE:
+ return candidates.stream().anyMatch(v -> actual >=
Long.parseLong(v));
+ case LT:
+ return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+ case LTE:
+ return candidates.stream().anyMatch(v -> actual <=
Long.parseLong(v));
+ case IN:
+ for (String candidate : candidates) {
+ if (actual == Long.parseLong(candidate)) {
+ return true;
+ }
+ }
+ return false;
+ default:
+ return true;
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates,
e.toString());
+ return true;
+ }
+ }
+
+ private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+ Function<String, TableSize> fetcher = getSizeFetcher();
+ if (fetcher != null) {
+ try {
+ TableSize fetched = fetcher.apply(tableNameWithType);
+ if (fetched != null) {
+ return fetched;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType,
e.toString());
+ }
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableSize size = fetchTableSizeForName(rawTableName);
+ if (size == null) {
+ size = fetchTableSizeForName(tableNameWithType);
+ if (size == null) {
+ LOGGER.warn("system.tables: failed to fetch size for {} (raw: {}),
falling back to zeros",
+ tableNameWithType, rawTableName);
+ }
+ }
+ return size;
+ }
+
+ private @Nullable TableSize fetchTableSizeForName(String tableName) {
+ for (String baseUrl : getControllerBaseUrls()) {
+ try {
+ String url = baseUrl + "/tables/" + tableName +
"/size?verbose=true&includeReplacedSegments=false";
+ HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+ .timeout(Duration.ofSeconds(5))
+ .GET()
+ .header("Accept", "application/json")
+ .build();
+ HttpResponse<String> response = _httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
+ if (response.statusCode() >= 200 && response.statusCode() < 300) {
+ TableSize parsed = JsonUtils.stringToObject(response.body(),
TableSize.class);
+ LOGGER.info("system.tables: controller size response for {} via {}
-> segments offline={}, realtime={}, "
+ + "reportedSize={}, estimatedSize={}", tableName, baseUrl,
+ parsed._offlineSegments != null &&
parsed._offlineSegments._segments != null
+ ? parsed._offlineSegments._segments.size() : 0,
+ parsed._realtimeSegments != null &&
parsed._realtimeSegments._segments != null
+ ? parsed._realtimeSegments._segments.size() : 0,
+ parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+ return parsed;
+ } else {
+ LOGGER.warn("system.tables: failed to fetch table size for {} via
{}: status {}, body={}", tableName,
+ baseUrl, response.statusCode(), response.body());
+ }
+ } catch (Exception e) {
+ LOGGER.warn("system.tables: error fetching table size for {} via {}:
{}", tableName, baseUrl, e.toString(), e);
+ System.out.println("system.tables fetch error for " + tableName + "
via " + baseUrl + ": " + e);
+ }
+ }
+ return null;
+ }
+
+ private List<String> getControllerBaseUrls() {
+ Set<String> urls = new LinkedHashSet<>();
+ if (_helixAdmin != null) {
+ for (String controller : discoverControllersFromHelix()) {
+ String normalized = normalizeControllerUrl(controller);
+ if (normalized != null) {
+ urls.add(normalized);
+ }
+ }
+ }
+ for (String url : _staticControllerUrls) {
+ String normalized = normalizeControllerUrl(url);
+ if (normalized != null) {
+ urls.add(normalized);
+ }
+ }
+ return new ArrayList<>(urls);
+ }
+
+ private int getSegmentCount(TableSize sizeFromController, TableType
tableType) {
+ if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments
!= null
+ && sizeFromController._offlineSegments._segments != null) {
+ return sizeFromController._offlineSegments._segments.size();
+ }
+ if (tableType == TableType.REALTIME &&
sizeFromController._realtimeSegments != null
+ && sizeFromController._realtimeSegments._segments != null) {
+ return sizeFromController._realtimeSegments._segments.size();
+ }
+ return 0;
+ }
+
+ private long getTotalDocs(TableSize sizeFromController, TableType tableType)
{
+ if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments
!= null
+ && sizeFromController._offlineSegments._segments != null) {
+ return sizeFromController._offlineSegments._segments.values().stream()
+ .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+ }
+ if (tableType == TableType.REALTIME &&
sizeFromController._realtimeSegments != null
+ && sizeFromController._realtimeSegments._segments != null) {
+ return sizeFromController._realtimeSegments._segments.values().stream()
+ .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+ }
+ return 0;
+ }
+
+ private @Nullable Function<String, TableSize> getSizeFetcher() {
+ if (_tableSizeFetcherOverride != null) {
+ return _tableSizeFetcherOverride;
+ }
+ List<String> controllers = getControllerBaseUrls();
+ if (controllers.isEmpty() || !isAdminClientAvailable()) {
+ return null;
+ }
+ return tableNameWithType -> fetchWithAdminClient(controllers,
tableNameWithType);
+ }
+
+ private List<String> discoverControllersFromHelix() {
+ List<String> urls = new ArrayList<>();
+ try {
+ if (_clusterName == null) {
+ LOGGER.warn("Cannot discover controllers without cluster name");
+ return List.of();
+ }
+ for (String controllerId :
_helixAdmin.getInstancesInCluster(_clusterName)) {
+ if (!InstanceTypeUtils.isController(controllerId)) {
+ continue;
+ }
+ int idx = controllerId.lastIndexOf('_');
+ if (idx > 0 && idx < controllerId.length() - 1) {
+ String host = controllerId.substring(controllerId.indexOf('_') + 1,
idx);
+ String port = controllerId.substring(idx + 1);
+ urls.add(host + ":" + port);
+ } else {
+ LOGGER.warn("Unable to parse controller address from instance id:
{}", controllerId);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to discover controllers from Helix", e);
+ }
+ return urls;
+ }
+
+ private boolean isAdminClientAvailable() {
+ try {
+ Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+ return true;
+ } catch (ClassNotFoundException e) {
+ LOGGER.debug("PinotAdminClient not on classpath; falling back to HTTP
for table size fetch");
+ return false;
+ }
Review Comment:
The method performs a reflective class lookup on every call to check
classpath availability. Cache the result in a static final field to avoid
repeated Class.forName operations.
```suggestion
private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
static {
boolean available;
try {
Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
available = true;
} catch (ClassNotFoundException e) {
available = false;
}
IS_ADMIN_CLIENT_AVAILABLE = available;
}
private boolean isAdminClientAvailable() {
if (!IS_ADMIN_CLIENT_AVAILABLE) {
LOGGER.debug("PinotAdminClient not on classpath; falling back to HTTP
for table size fetch");
}
return IS_ADMIN_CLIENT_AVAILABLE;
```
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java:
##########
@@ -67,6 +67,10 @@ public static String translateTableName(String tableName,
@Nullable String datab
case 2:
Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table
name '%s'", tableName);
String databasePrefix = tableSplit[0];
+ // Allow system tables to bypass database prefix validation so they
can be queried regardless of header.
Review Comment:
[nitpick] Add a comment explaining that this bypass allows system tables to
be queried regardless of database header, as mentioned in the inline comment on
line 70 but would benefit from additional context about security implications
or expected usage patterns.
```suggestion
/**
* Allow system tables to bypass database prefix validation so they
can be queried regardless of header.
* System tables are globally accessible and are not subject to
database-level access controls.
* This is intentional, as system tables typically provide metadata
or monitoring information
* that should be available across all databases. Care should be
taken to ensure that system tables
* do not expose sensitive information, as they are accessible
regardless of the database context
* provided in the request header.
*/
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]