Copilot commented on code in PR #17291: URL: https://github.com/apache/pinot/pull/17291#discussion_r2640570977
########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java: ########## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import javax.ws.rs.core.HttpHeaders; +import org.apache.commons.collections.CollectionUtils; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.querylog.QueryLogger; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableDataProvider; +import org.apache.pinot.common.systemtable.SystemTableRegistry; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.spi.accounting.ThreadAccountant; +import org.apache.pinot.spi.auth.AuthorizationResult; +import org.apache.pinot.spi.auth.broker.RequesterIdentity; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Broker request handler for system tables (handled entirely on the broker). + */ +public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class); + + public SystemTableBrokerRequestHandler(PinotConfiguration config, String brokerId, + BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + ThreadAccountant threadAccountant) { + super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache, + threadAccountant); + } + + @Override + public void start() { + } + + @Override + public void shutDown() { + } + + public boolean canHandle(String tableName) { + return isSystemTable(tableName) && SystemTableRegistry.isRegistered(tableName); + } + + @Override + protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) + throws Exception { + PinotQuery pinotQuery; + try { + pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); + } catch (Exception e) { + requestContext.setErrorCode(QueryErrorCode.SQL_PARSING); + return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage()); + } + + Set<String> tableNames = RequestUtils.getTableNames(pinotQuery); + if (tableNames == null || tableNames.isEmpty()) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Failed to extract table name"); + } + if (tableNames.size() != 1) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "System tables do not support joins"); + } + String tableName = tableNames.iterator().next(); + if (!isSystemTable(tableName)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Not a system table query"); + } + AuthorizationResult authorizationResult = + hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, httpHeaders); + if (!authorizationResult.hasAccess()) { + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + return new BrokerResponseNative(QueryErrorCode.ACCESS_DENIED, authorizationResult.getFailureMessage()); + } + + return handleSystemTableQuery(pinotQuery, tableName, requestContext, requesterIdentity, query); + } + + @Override + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) { + return false; + } + + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) + throws Exception { + return false; + } + + @Override + public Map<Long, String> getRunningQueries() { + return Collections.emptyMap(); + } + + @Override + public OptionalLong getRequestIdByClientId(String clientQueryId) { + return OptionalLong.empty(); + } + + private boolean isSystemTable(String tableName) { + return tableName != null && tableName.toLowerCase(Locale.ROOT).startsWith("system."); + } + + private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String tableName, RequestContext requestContext, + @Nullable RequesterIdentity requesterIdentity, String query) { + if (pinotQuery.isExplain()) { + return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; + } + SystemTableDataProvider provider = SystemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + return BrokerResponseNative.TABLE_DOES_NOT_EXIST; + } + try { + if (!isSupportedSystemTableQuery(pinotQuery)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, + "System tables only support simple projection/filter/limit queries"); Review Comment: The error message does not specify which unsupported features were used (e.g., GROUP BY, ORDER BY, HAVING). Include the specific unsupported clause(s) found to help users understand the issue. ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java: ########## @@ -106,7 +113,25 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption } } + Set<String> tableNames = null; + if (_systemTableBrokerRequestHandler != null) { + try { + // NOTE: compileToPinotQuery(SqlNodeAndOptions) mutates the SqlNode (e.g. SqlOrderBy -> SqlSelect). + // Re-parse from raw SQL so the SqlNodeAndOptions passed to the handler is unchanged (important for MSE). Review Comment: The comment references 'MSE' without defining the acronym. Add '(Multi-Stage Engine)' after the first usage to clarify for future maintainers unfamiliar with this shorthand. ```suggestion // Re-parse from raw SQL so the SqlNodeAndOptions passed to the handler is unchanged (important for MSE (Multi-Stage Engine)). ``` ########## pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java: ########## @@ -67,6 +67,19 @@ public static String translateTableName(String tableName, @Nullable String datab case 2: Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table name '%s'", tableName); String databasePrefix = tableSplit[0]; + /* + * Allow system tables to bypass database prefix validation so they can be queried regardless of the database + * header. System tables are intended to be globally accessible and are not subject to database-level access + * controls. Ensure system tables do not expose sensitive information because they can be queried without a + * matching database context. + * + * Security note: system tables are intentionally exempt from database-scoped access control; broader Pinot + * security is handled by AccessControl implementations (see org.apache.pinot.spi.security.AccessControl) and + * the Pinot security model documentation. New system tables should be vetted for sensitive content. + */ + if ("system".equalsIgnoreCase(databasePrefix)) { + return tableName; Review Comment: The newly added system table bypass logic lacks corresponding test coverage. Add a test case verifying that system.* tables are correctly exempted from database prefix validation. ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableDataProvider; +import org.apache.pinot.common.systemtable.SystemTableResponseUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTable; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.FilterKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableDataProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final long SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>(); + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public TableConfig getTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + } + + @Override + public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) { + List<String> projectionColumns = pinotQuery.getSelectList() != null + ? pinotQuery.getSelectList().stream().map(expr -> expr.getIdentifier().getName()).collect(Collectors.toList()) + : List.of(); + if (_tableCache == null) { + return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, projectionColumns, List.of(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, pinotQuery.getOffset()); + int limit = pinotQuery.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(pinotQuery.getFilterExpression(), stats, rawTableName)) { + continue; + } + if (offset > 0) { + offset--; + totalRows++; + continue; + } + totalRows++; + if (limit == 0) { + continue; + } + if (hasLimit && rows.size() >= limit) { + continue; + } + GenericRow row = new GenericRow(); + row.putValue("tableName", rawTableName); + row.putValue("type", stats._type); + row.putValue("status", stats._status); + row.putValue("segments", stats._segments); + row.putValue("totalDocs", stats._totalDocs); + row.putValue("reportedSize", stats._reportedSizeInBytes); + row.putValue("estimatedSize", stats._estimatedSizeInBytes); + row.putValue("storageTier", stats._storageTier); + row.putValue("brokerTenant", stats._brokerTenant); + row.putValue("serverTenant", stats._serverTenant); + row.putValue("replicas", stats._replicas); + row.putValue("tableConfig", stats._tableConfig); + rows.add(row); + } + return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, projectionColumns, rows, totalRows); + } + + private TableStats buildStats(String tableNameWithType, TableType tableType) { + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + int segments = 0; + long totalDocs = 0; + long reportedSize = 0; + long estimatedSize = 0; + String tierValue = ""; + String brokerTenant = ""; + String serverTenant = ""; + int replicas = 0; + if (tableConfig != null && tableConfig.getTenantConfig() != null) { + brokerTenant = tableConfig.getTenantConfig().getBroker(); + serverTenant = tableConfig.getTenantConfig().getServer(); + } + if (tableConfig != null && tableConfig.getValidationConfig() != null) { + Integer repl = tableConfig.getValidationConfig().getReplicationNumber(); + replicas = repl != null ? repl : replicas; + } + // Use controller API only + TableSize sizeFromController = fetchTableSize(tableNameWithType); + if (sizeFromController != null) { + if (sizeFromController._reportedSizeInBytes >= 0) { + reportedSize = sizeFromController._reportedSizeInBytes; + } + if (sizeFromController._estimatedSizeInBytes >= 0) { + estimatedSize = sizeFromController._estimatedSizeInBytes; + } + segments = getSegmentCount(sizeFromController, tableType); + totalDocs = getTotalDocs(sizeFromController, tableType); + } + String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" : "UNKNOWN"); + String tableConfigJson = ""; + if (tableConfig != null) { + try { + tableConfigJson = JsonUtils.objectToString(tableConfig); + } catch (Exception e) { + LOGGER.warn("Failed to serialize table config for {}: {}", tableNameWithType, e.toString()); + tableConfigJson = tableConfig.toString(); + } + } + return new TableStats(tableType.name(), status, segments, totalDocs, reportedSize, estimatedSize, tierValue, + tableConfigJson, brokerTenant, serverTenant, replicas); + } + + private boolean matchesFilter(@Nullable org.apache.pinot.common.request.Expression filterExpression, TableStats stats, + String rawTableName) { + if (filterExpression == null) { + return true; + } + org.apache.pinot.common.request.Function function = filterExpression.getFunctionCall(); + if (function == null) { + return true; + } + FilterKind filterKind = toFilterKind(function.getOperator()); + if (filterKind == null) { + return true; + } + switch (filterKind) { + case AND: + for (org.apache.pinot.common.request.Expression child : function.getOperands()) { + if (!matchesFilter(child, stats, rawTableName)) { + return false; + } + } + return true; + case OR: + for (org.apache.pinot.common.request.Expression child : function.getOperands()) { + if (matchesFilter(child, stats, rawTableName)) { + return true; + } + } + return false; + case NOT: + if (function.getOperandsSize() == 0) { + return true; + } + return !matchesFilter(function.getOperands().get(0), stats, rawTableName); + default: + return matchesLeafFilter(filterKind, function.getOperands(), stats, rawTableName); + } + } + + private boolean matchesLeafFilter(FilterKind filterKind, + List<org.apache.pinot.common.request.Expression> operands, TableStats stats, String rawTableName) { + String column = extractIdentifier(operands); + if (column == null) { + return true; + } + List<String> values = extractLiteralValues(operands); + if (values.isEmpty()) { + return true; + } + switch (column.toLowerCase(Locale.ROOT)) { + case "tablename": + return matchesString(values, rawTableName, filterKind); + case "type": + return matchesString(values, stats._type, filterKind); + case "status": + return matchesString(values, stats._status, filterKind); + case "segments": + return matchesNumber(values, stats._segments, filterKind); + case "reportedsize": + return matchesNumber(values, stats._reportedSizeInBytes, filterKind); + case "estimatedsize": + return matchesNumber(values, stats._estimatedSizeInBytes, filterKind); + default: + return true; + } + } + + private boolean matchesString(List<String> candidates, String actual, FilterKind filterKind) { + switch (filterKind) { + case EQUALS: + case IN: + return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual)); + case NOT_EQUALS: + return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual)); + default: + return true; + } + } + + private boolean matchesNumber(List<String> candidates, long actual, FilterKind filterKind) { + try { + switch (filterKind) { + case EQUALS: + return candidates.stream().anyMatch(v -> Long.parseLong(v) == actual); + case NOT_EQUALS: + return candidates.stream().noneMatch(v -> Long.parseLong(v) == actual); + case GREATER_THAN: + return candidates.stream().anyMatch(v -> actual > Long.parseLong(v)); + case GREATER_THAN_OR_EQUAL: + return candidates.stream().anyMatch(v -> actual >= Long.parseLong(v)); + case LESS_THAN: + return candidates.stream().anyMatch(v -> actual < Long.parseLong(v)); + case LESS_THAN_OR_EQUAL: + return candidates.stream().anyMatch(v -> actual <= Long.parseLong(v)); + case IN: + for (String candidate : candidates) { + if (actual == Long.parseLong(candidate)) { + return true; + } + } + return false; + case RANGE: + case BETWEEN: + if (candidates.size() >= 2) { + long lower = Long.parseLong(candidates.get(0)); + long upper = Long.parseLong(candidates.get(1)); + return actual >= lower && actual <= upper; + } + return true; + default: + return true; + } + } catch (NumberFormatException e) { + LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, e.toString()); + return true; + } + } + + private @Nullable String extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) { + for (org.apache.pinot.common.request.Expression operand : operands) { + org.apache.pinot.common.request.Identifier identifier = operand.getIdentifier(); + if (identifier != null) { + return identifier.getName(); + } + } + return null; + } + + private List<String> extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) { + List<String> values = new ArrayList<>(); + for (org.apache.pinot.common.request.Expression operand : operands) { + org.apache.pinot.common.request.Literal literal = operand.getLiteral(); + if (literal != null) { + if (literal.getSetField() == org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) { + values.add("null"); + continue; + } + try { + values.add(RequestUtils.getLiteralString(literal)); + } catch (Exception e) { + values.add(String.valueOf(RequestUtils.getLiteralValue(literal))); + } + } + } + return values; + } + + private @Nullable FilterKind toFilterKind(String operator) { + String normalized = operator.toUpperCase(Locale.ROOT); + switch (normalized) { + case "EQ": + return FilterKind.EQUALS; + case "NEQ": + return FilterKind.NOT_EQUALS; + case "GT": + return FilterKind.GREATER_THAN; + case "GTE": + return FilterKind.GREATER_THAN_OR_EQUAL; + case "LT": + return FilterKind.LESS_THAN; + case "LTE": + return FilterKind.LESS_THAN_OR_EQUAL; + default: + try { + return FilterKind.valueOf(normalized); + } catch (Exception e) { + return null; + } + } + } + + private @Nullable TableSize fetchTableSize(String tableNameWithType) { + TableSize cached = getCachedSize(tableNameWithType); + if (cached != null) { + return cached; + } + Function<String, TableSize> fetcher = getSizeFetcher(); + if (fetcher != null) { + try { + TableSize fetched = fetcher.apply(tableNameWithType); + if (fetched != null) { + cacheSize(tableNameWithType, fetched); + return fetched; + } + } catch (Exception e) { + LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, e.toString()); + } + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableSize size = fetchTableSizeForName(rawTableName); + if (size == null) { + size = fetchTableSizeForName(tableNameWithType); + if (size == null) { + LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning null", TABLE_NAME, tableNameWithType, + rawTableName); + } + } + if (size != null) { + cacheSize(tableNameWithType, size); + } + return size; + } + + private @Nullable TableSize fetchTableSizeForName(String tableName) { + for (String baseUrl : getControllerBaseUrls()) { + try { + String url = baseUrl + "/tables/" + tableName + "/size?verbose=true&includeReplacedSegments=false"; + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .timeout(Duration.ofSeconds(5)) + .GET() + .header("Accept", "application/json") + .build(); + HttpResponse<String> response = _httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() >= 200 && response.statusCode() < 300) { + TableSize parsed = JsonUtils.stringToObject(response.body(), TableSize.class); + LOGGER.debug("{}: controller size response for {} via {} -> segments offline={}, realtime={}, " + + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, baseUrl, + parsed._offlineSegments != null && parsed._offlineSegments._segments != null + ? parsed._offlineSegments._segments.size() : 0, + parsed._realtimeSegments != null && parsed._realtimeSegments._segments != null + ? parsed._realtimeSegments._segments.size() : 0, + parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes); + return parsed; + } else { + LOGGER.warn("{}: failed to fetch table size for {} via {}: status {}, body={}", TABLE_NAME, tableName, + baseUrl, response.statusCode(), response.body()); + } + } catch (Exception e) { + LOGGER.warn("{}: error fetching table size for {} via {}", TABLE_NAME, tableName, baseUrl, e); + } + } + return null; + } + + private List<String> getControllerBaseUrls() { + Set<String> urls = new LinkedHashSet<>(); + if (_helixAdmin != null) { + for (String controller : discoverControllersFromHelix()) { + String normalized = normalizeControllerUrl(controller); + if (normalized != null) { + urls.add(normalized); + } + } + } + for (String url : _staticControllerUrls) { + String normalized = normalizeControllerUrl(url); + if (normalized != null) { + urls.add(normalized); + } + } + return new ArrayList<>(urls); + } + + private int getSegmentCount(TableSize sizeFromController, TableType tableType) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + return sizeFromController._offlineSegments._segments.size(); + } + if (tableType == TableType.REALTIME && sizeFromController._realtimeSegments != null + && sizeFromController._realtimeSegments._segments != null) { + return sizeFromController._realtimeSegments._segments.size(); + } + return 0; + } + + private long getTotalDocs(TableSize sizeFromController, TableType tableType) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + return sizeFromController._offlineSegments._segments.values().stream() + .mapToLong(segmentSize -> segmentSize._totalDocs).sum(); + } + if (tableType == TableType.REALTIME && sizeFromController._realtimeSegments != null + && sizeFromController._realtimeSegments._segments != null) { + return sizeFromController._realtimeSegments._segments.values().stream() + .mapToLong(segmentSize -> segmentSize._totalDocs).sum(); + } + return 0; + } + + private @Nullable Function<String, TableSize> getSizeFetcher() { + if (_tableSizeFetcherOverride != null) { + return _tableSizeFetcherOverride; + } + List<String> controllers = getControllerBaseUrls(); + if (controllers.isEmpty() || !isAdminClientAvailable()) { + return null; + } + return tableNameWithType -> fetchWithAdminClient(controllers, tableNameWithType); + } + + private List<String> discoverControllersFromHelix() { + List<String> urls = new ArrayList<>(); + try { + if (_clusterName == null) { + LOGGER.warn("Cannot discover controllers without cluster name"); + return List.of(); + } + for (String controllerId : _helixAdmin.getInstancesInCluster(_clusterName)) { + if (!InstanceTypeUtils.isController(controllerId)) { + continue; + } + int firstUnderscore = controllerId.indexOf('_'); + int lastUnderscore = controllerId.lastIndexOf('_'); + if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && lastUnderscore < controllerId.length() - 1) { + String host = controllerId.substring(firstUnderscore + 1, lastUnderscore); + String port = controllerId.substring(lastUnderscore + 1); + if (!host.isEmpty() && !port.isEmpty()) { + urls.add(host + ":" + port); + } else { + LOGGER.warn("Unable to parse controller address from instance id (empty host/port): {}", controllerId); + } + } else { + LOGGER.warn("Unable to parse controller address from instance id: {}", controllerId); Review Comment: The error message does not explain *why* parsing failed. Consider including details about the expected format or the actual structure encountered to aid debugging. ```suggestion LOGGER.warn( "Unable to parse controller address from instance id '{}': expected format '<prefix>_<host>_<port>' " + "but got firstUnderscore={}, lastUnderscore={}, length={}", controllerId, firstUnderscore, lastUnderscore, controllerId.length()); ``` ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableDataProvider; +import org.apache.pinot.common.systemtable.SystemTableResponseUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTable; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.FilterKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableDataProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final long SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>(); + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public TableConfig getTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + } + + @Override + public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) { + List<String> projectionColumns = pinotQuery.getSelectList() != null + ? pinotQuery.getSelectList().stream().map(expr -> expr.getIdentifier().getName()).collect(Collectors.toList()) + : List.of(); + if (_tableCache == null) { + return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, projectionColumns, List.of(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, pinotQuery.getOffset()); + int limit = pinotQuery.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(pinotQuery.getFilterExpression(), stats, rawTableName)) { + continue; + } + if (offset > 0) { + offset--; + totalRows++; + continue; + } + totalRows++; + if (limit == 0) { + continue; + } + if (hasLimit && rows.size() >= limit) { + continue; + } + GenericRow row = new GenericRow(); + row.putValue("tableName", rawTableName); + row.putValue("type", stats._type); + row.putValue("status", stats._status); + row.putValue("segments", stats._segments); + row.putValue("totalDocs", stats._totalDocs); + row.putValue("reportedSize", stats._reportedSizeInBytes); + row.putValue("estimatedSize", stats._estimatedSizeInBytes); + row.putValue("storageTier", stats._storageTier); + row.putValue("brokerTenant", stats._brokerTenant); + row.putValue("serverTenant", stats._serverTenant); + row.putValue("replicas", stats._replicas); + row.putValue("tableConfig", stats._tableConfig); + rows.add(row); + } + return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, projectionColumns, rows, totalRows); + } + + private TableStats buildStats(String tableNameWithType, TableType tableType) { + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + int segments = 0; + long totalDocs = 0; + long reportedSize = 0; + long estimatedSize = 0; + String tierValue = ""; + String brokerTenant = ""; + String serverTenant = ""; + int replicas = 0; + if (tableConfig != null && tableConfig.getTenantConfig() != null) { + brokerTenant = tableConfig.getTenantConfig().getBroker(); + serverTenant = tableConfig.getTenantConfig().getServer(); + } + if (tableConfig != null && tableConfig.getValidationConfig() != null) { + Integer repl = tableConfig.getValidationConfig().getReplicationNumber(); + replicas = repl != null ? repl : replicas; + } + // Use controller API only + TableSize sizeFromController = fetchTableSize(tableNameWithType); + if (sizeFromController != null) { + if (sizeFromController._reportedSizeInBytes >= 0) { + reportedSize = sizeFromController._reportedSizeInBytes; + } + if (sizeFromController._estimatedSizeInBytes >= 0) { + estimatedSize = sizeFromController._estimatedSizeInBytes; + } + segments = getSegmentCount(sizeFromController, tableType); + totalDocs = getTotalDocs(sizeFromController, tableType); + } + String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" : "UNKNOWN"); + String tableConfigJson = ""; + if (tableConfig != null) { + try { + tableConfigJson = JsonUtils.objectToString(tableConfig); + } catch (Exception e) { + LOGGER.warn("Failed to serialize table config for {}: {}", tableNameWithType, e.toString()); + tableConfigJson = tableConfig.toString(); + } + } + return new TableStats(tableType.name(), status, segments, totalDocs, reportedSize, estimatedSize, tierValue, + tableConfigJson, brokerTenant, serverTenant, replicas); + } + + private boolean matchesFilter(@Nullable org.apache.pinot.common.request.Expression filterExpression, TableStats stats, + String rawTableName) { + if (filterExpression == null) { + return true; + } + org.apache.pinot.common.request.Function function = filterExpression.getFunctionCall(); + if (function == null) { + return true; + } + FilterKind filterKind = toFilterKind(function.getOperator()); + if (filterKind == null) { + return true; + } + switch (filterKind) { + case AND: + for (org.apache.pinot.common.request.Expression child : function.getOperands()) { + if (!matchesFilter(child, stats, rawTableName)) { + return false; + } + } + return true; + case OR: + for (org.apache.pinot.common.request.Expression child : function.getOperands()) { + if (matchesFilter(child, stats, rawTableName)) { + return true; + } + } + return false; + case NOT: + if (function.getOperandsSize() == 0) { + return true; + } + return !matchesFilter(function.getOperands().get(0), stats, rawTableName); + default: + return matchesLeafFilter(filterKind, function.getOperands(), stats, rawTableName); + } + } + + private boolean matchesLeafFilter(FilterKind filterKind, + List<org.apache.pinot.common.request.Expression> operands, TableStats stats, String rawTableName) { + String column = extractIdentifier(operands); + if (column == null) { + return true; + } + List<String> values = extractLiteralValues(operands); + if (values.isEmpty()) { + return true; + } + switch (column.toLowerCase(Locale.ROOT)) { + case "tablename": + return matchesString(values, rawTableName, filterKind); + case "type": + return matchesString(values, stats._type, filterKind); + case "status": + return matchesString(values, stats._status, filterKind); + case "segments": + return matchesNumber(values, stats._segments, filterKind); + case "reportedsize": + return matchesNumber(values, stats._reportedSizeInBytes, filterKind); + case "estimatedsize": + return matchesNumber(values, stats._estimatedSizeInBytes, filterKind); + default: + return true; + } + } + + private boolean matchesString(List<String> candidates, String actual, FilterKind filterKind) { + switch (filterKind) { + case EQUALS: + case IN: + return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual)); + case NOT_EQUALS: + return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual)); + default: + return true; + } + } + + private boolean matchesNumber(List<String> candidates, long actual, FilterKind filterKind) { + try { + switch (filterKind) { + case EQUALS: + return candidates.stream().anyMatch(v -> Long.parseLong(v) == actual); + case NOT_EQUALS: + return candidates.stream().noneMatch(v -> Long.parseLong(v) == actual); + case GREATER_THAN: + return candidates.stream().anyMatch(v -> actual > Long.parseLong(v)); + case GREATER_THAN_OR_EQUAL: + return candidates.stream().anyMatch(v -> actual >= Long.parseLong(v)); + case LESS_THAN: + return candidates.stream().anyMatch(v -> actual < Long.parseLong(v)); + case LESS_THAN_OR_EQUAL: + return candidates.stream().anyMatch(v -> actual <= Long.parseLong(v)); + case IN: + for (String candidate : candidates) { + if (actual == Long.parseLong(candidate)) { + return true; + } + } + return false; + case RANGE: + case BETWEEN: + if (candidates.size() >= 2) { + long lower = Long.parseLong(candidates.get(0)); + long upper = Long.parseLong(candidates.get(1)); + return actual >= lower && actual <= upper; + } + return true; + default: + return true; + } + } catch (NumberFormatException e) { + LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, e.toString()); + return true; + } + } + + private @Nullable String extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) { + for (org.apache.pinot.common.request.Expression operand : operands) { + org.apache.pinot.common.request.Identifier identifier = operand.getIdentifier(); + if (identifier != null) { + return identifier.getName(); + } + } + return null; + } + + private List<String> extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) { + List<String> values = new ArrayList<>(); + for (org.apache.pinot.common.request.Expression operand : operands) { + org.apache.pinot.common.request.Literal literal = operand.getLiteral(); + if (literal != null) { + if (literal.getSetField() == org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) { + values.add("null"); + continue; + } + try { + values.add(RequestUtils.getLiteralString(literal)); + } catch (Exception e) { + values.add(String.valueOf(RequestUtils.getLiteralValue(literal))); + } + } + } + return values; + } + + private @Nullable FilterKind toFilterKind(String operator) { + String normalized = operator.toUpperCase(Locale.ROOT); + switch (normalized) { + case "EQ": + return FilterKind.EQUALS; + case "NEQ": + return FilterKind.NOT_EQUALS; + case "GT": + return FilterKind.GREATER_THAN; + case "GTE": + return FilterKind.GREATER_THAN_OR_EQUAL; + case "LT": + return FilterKind.LESS_THAN; + case "LTE": + return FilterKind.LESS_THAN_OR_EQUAL; + default: + try { + return FilterKind.valueOf(normalized); + } catch (Exception e) { + return null; + } + } + } + + private @Nullable TableSize fetchTableSize(String tableNameWithType) { + TableSize cached = getCachedSize(tableNameWithType); + if (cached != null) { + return cached; + } + Function<String, TableSize> fetcher = getSizeFetcher(); + if (fetcher != null) { + try { + TableSize fetched = fetcher.apply(tableNameWithType); + if (fetched != null) { + cacheSize(tableNameWithType, fetched); + return fetched; + } + } catch (Exception e) { + LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, e.toString()); + } + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableSize size = fetchTableSizeForName(rawTableName); + if (size == null) { + size = fetchTableSizeForName(tableNameWithType); + if (size == null) { + LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning null", TABLE_NAME, tableNameWithType, + rawTableName); + } + } + if (size != null) { + cacheSize(tableNameWithType, size); + } + return size; + } + + private @Nullable TableSize fetchTableSizeForName(String tableName) { + for (String baseUrl : getControllerBaseUrls()) { + try { + String url = baseUrl + "/tables/" + tableName + "/size?verbose=true&includeReplacedSegments=false"; + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .timeout(Duration.ofSeconds(5)) + .GET() + .header("Accept", "application/json") + .build(); + HttpResponse<String> response = _httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() >= 200 && response.statusCode() < 300) { + TableSize parsed = JsonUtils.stringToObject(response.body(), TableSize.class); + LOGGER.debug("{}: controller size response for {} via {} -> segments offline={}, realtime={}, " + + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, baseUrl, + parsed._offlineSegments != null && parsed._offlineSegments._segments != null + ? parsed._offlineSegments._segments.size() : 0, + parsed._realtimeSegments != null && parsed._realtimeSegments._segments != null + ? parsed._realtimeSegments._segments.size() : 0, + parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes); Review Comment: The nested null checks for segment counts make the logging statement difficult to read. Extract these calculations into helper methods like getOfflineSegmentCount() and getRealtimeSegmentCount() to improve readability. ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableDataProvider; +import org.apache.pinot.common.systemtable.SystemTableResponseUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTable; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.FilterKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableDataProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final long SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>(); + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public TableConfig getTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + } + + @Override + public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) { + List<String> projectionColumns = pinotQuery.getSelectList() != null + ? pinotQuery.getSelectList().stream().map(expr -> expr.getIdentifier().getName()).collect(Collectors.toList()) + : List.of(); + if (_tableCache == null) { + return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, projectionColumns, List.of(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, pinotQuery.getOffset()); + int limit = pinotQuery.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(pinotQuery.getFilterExpression(), stats, rawTableName)) { + continue; + } + if (offset > 0) { + offset--; + totalRows++; + continue; + } + totalRows++; + if (limit == 0) { + continue; + } + if (hasLimit && rows.size() >= limit) { + continue; + } + GenericRow row = new GenericRow(); + row.putValue("tableName", rawTableName); + row.putValue("type", stats._type); + row.putValue("status", stats._status); + row.putValue("segments", stats._segments); + row.putValue("totalDocs", stats._totalDocs); + row.putValue("reportedSize", stats._reportedSizeInBytes); + row.putValue("estimatedSize", stats._estimatedSizeInBytes); + row.putValue("storageTier", stats._storageTier); + row.putValue("brokerTenant", stats._brokerTenant); + row.putValue("serverTenant", stats._serverTenant); + row.putValue("replicas", stats._replicas); + row.putValue("tableConfig", stats._tableConfig); + rows.add(row); + } + return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, projectionColumns, rows, totalRows); + } + + private TableStats buildStats(String tableNameWithType, TableType tableType) { + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + int segments = 0; + long totalDocs = 0; + long reportedSize = 0; + long estimatedSize = 0; + String tierValue = ""; + String brokerTenant = ""; + String serverTenant = ""; + int replicas = 0; + if (tableConfig != null && tableConfig.getTenantConfig() != null) { + brokerTenant = tableConfig.getTenantConfig().getBroker(); + serverTenant = tableConfig.getTenantConfig().getServer(); + } + if (tableConfig != null && tableConfig.getValidationConfig() != null) { + Integer repl = tableConfig.getValidationConfig().getReplicationNumber(); + replicas = repl != null ? repl : replicas; + } + // Use controller API only + TableSize sizeFromController = fetchTableSize(tableNameWithType); + if (sizeFromController != null) { + if (sizeFromController._reportedSizeInBytes >= 0) { + reportedSize = sizeFromController._reportedSizeInBytes; + } + if (sizeFromController._estimatedSizeInBytes >= 0) { + estimatedSize = sizeFromController._estimatedSizeInBytes; + } + segments = getSegmentCount(sizeFromController, tableType); + totalDocs = getTotalDocs(sizeFromController, tableType); + } + String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" : "UNKNOWN"); + String tableConfigJson = ""; + if (tableConfig != null) { + try { + tableConfigJson = JsonUtils.objectToString(tableConfig); + } catch (Exception e) { + LOGGER.warn("Failed to serialize table config for {}: {}", tableNameWithType, e.toString()); + tableConfigJson = tableConfig.toString(); + } + } + return new TableStats(tableType.name(), status, segments, totalDocs, reportedSize, estimatedSize, tierValue, + tableConfigJson, brokerTenant, serverTenant, replicas); + } + + private boolean matchesFilter(@Nullable org.apache.pinot.common.request.Expression filterExpression, TableStats stats, + String rawTableName) { + if (filterExpression == null) { + return true; + } + org.apache.pinot.common.request.Function function = filterExpression.getFunctionCall(); + if (function == null) { + return true; + } + FilterKind filterKind = toFilterKind(function.getOperator()); + if (filterKind == null) { + return true; + } + switch (filterKind) { + case AND: + for (org.apache.pinot.common.request.Expression child : function.getOperands()) { + if (!matchesFilter(child, stats, rawTableName)) { + return false; + } + } + return true; + case OR: + for (org.apache.pinot.common.request.Expression child : function.getOperands()) { + if (matchesFilter(child, stats, rawTableName)) { + return true; + } + } + return false; + case NOT: + if (function.getOperandsSize() == 0) { + return true; + } + return !matchesFilter(function.getOperands().get(0), stats, rawTableName); + default: + return matchesLeafFilter(filterKind, function.getOperands(), stats, rawTableName); + } + } + + private boolean matchesLeafFilter(FilterKind filterKind, + List<org.apache.pinot.common.request.Expression> operands, TableStats stats, String rawTableName) { + String column = extractIdentifier(operands); + if (column == null) { + return true; + } + List<String> values = extractLiteralValues(operands); + if (values.isEmpty()) { + return true; + } + switch (column.toLowerCase(Locale.ROOT)) { + case "tablename": + return matchesString(values, rawTableName, filterKind); + case "type": + return matchesString(values, stats._type, filterKind); + case "status": + return matchesString(values, stats._status, filterKind); + case "segments": + return matchesNumber(values, stats._segments, filterKind); + case "reportedsize": + return matchesNumber(values, stats._reportedSizeInBytes, filterKind); + case "estimatedsize": + return matchesNumber(values, stats._estimatedSizeInBytes, filterKind); + default: + return true; + } + } + + private boolean matchesString(List<String> candidates, String actual, FilterKind filterKind) { + switch (filterKind) { + case EQUALS: + case IN: + return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual)); + case NOT_EQUALS: + return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual)); + default: + return true; + } + } + + private boolean matchesNumber(List<String> candidates, long actual, FilterKind filterKind) { + try { + switch (filterKind) { + case EQUALS: + return candidates.stream().anyMatch(v -> Long.parseLong(v) == actual); + case NOT_EQUALS: + return candidates.stream().noneMatch(v -> Long.parseLong(v) == actual); + case GREATER_THAN: + return candidates.stream().anyMatch(v -> actual > Long.parseLong(v)); + case GREATER_THAN_OR_EQUAL: + return candidates.stream().anyMatch(v -> actual >= Long.parseLong(v)); + case LESS_THAN: + return candidates.stream().anyMatch(v -> actual < Long.parseLong(v)); + case LESS_THAN_OR_EQUAL: + return candidates.stream().anyMatch(v -> actual <= Long.parseLong(v)); + case IN: + for (String candidate : candidates) { + if (actual == Long.parseLong(candidate)) { + return true; + } + } + return false; + case RANGE: + case BETWEEN: + if (candidates.size() >= 2) { + long lower = Long.parseLong(candidates.get(0)); + long upper = Long.parseLong(candidates.get(1)); + return actual >= lower && actual <= upper; + } + return true; + default: + return true; + } + } catch (NumberFormatException e) { + LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, e.toString()); + return true; + } + } + + private @Nullable String extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) { + for (org.apache.pinot.common.request.Expression operand : operands) { + org.apache.pinot.common.request.Identifier identifier = operand.getIdentifier(); + if (identifier != null) { + return identifier.getName(); + } + } + return null; + } + + private List<String> extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) { + List<String> values = new ArrayList<>(); + for (org.apache.pinot.common.request.Expression operand : operands) { + org.apache.pinot.common.request.Literal literal = operand.getLiteral(); + if (literal != null) { + if (literal.getSetField() == org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) { + values.add("null"); + continue; + } + try { + values.add(RequestUtils.getLiteralString(literal)); + } catch (Exception e) { + values.add(String.valueOf(RequestUtils.getLiteralValue(literal))); + } + } + } + return values; + } + + private @Nullable FilterKind toFilterKind(String operator) { + String normalized = operator.toUpperCase(Locale.ROOT); + switch (normalized) { + case "EQ": + return FilterKind.EQUALS; + case "NEQ": + return FilterKind.NOT_EQUALS; + case "GT": + return FilterKind.GREATER_THAN; + case "GTE": + return FilterKind.GREATER_THAN_OR_EQUAL; + case "LT": + return FilterKind.LESS_THAN; + case "LTE": + return FilterKind.LESS_THAN_OR_EQUAL; + default: + try { + return FilterKind.valueOf(normalized); + } catch (Exception e) { + return null; + } + } + } + + private @Nullable TableSize fetchTableSize(String tableNameWithType) { + TableSize cached = getCachedSize(tableNameWithType); + if (cached != null) { + return cached; + } + Function<String, TableSize> fetcher = getSizeFetcher(); + if (fetcher != null) { + try { + TableSize fetched = fetcher.apply(tableNameWithType); + if (fetched != null) { + cacheSize(tableNameWithType, fetched); + return fetched; + } + } catch (Exception e) { + LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, e.toString()); + } + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableSize size = fetchTableSizeForName(rawTableName); + if (size == null) { + size = fetchTableSizeForName(tableNameWithType); + if (size == null) { + LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning null", TABLE_NAME, tableNameWithType, + rawTableName); + } + } + if (size != null) { + cacheSize(tableNameWithType, size); + } + return size; + } + + private @Nullable TableSize fetchTableSizeForName(String tableName) { + for (String baseUrl : getControllerBaseUrls()) { + try { + String url = baseUrl + "/tables/" + tableName + "/size?verbose=true&includeReplacedSegments=false"; + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .timeout(Duration.ofSeconds(5)) + .GET() + .header("Accept", "application/json") + .build(); + HttpResponse<String> response = _httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() >= 200 && response.statusCode() < 300) { + TableSize parsed = JsonUtils.stringToObject(response.body(), TableSize.class); + LOGGER.debug("{}: controller size response for {} via {} -> segments offline={}, realtime={}, " + + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, baseUrl, + parsed._offlineSegments != null && parsed._offlineSegments._segments != null + ? parsed._offlineSegments._segments.size() : 0, + parsed._realtimeSegments != null && parsed._realtimeSegments._segments != null + ? parsed._realtimeSegments._segments.size() : 0, + parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes); + return parsed; + } else { + LOGGER.warn("{}: failed to fetch table size for {} via {}: status {}, body={}", TABLE_NAME, tableName, + baseUrl, response.statusCode(), response.body()); + } + } catch (Exception e) { + LOGGER.warn("{}: error fetching table size for {} via {}", TABLE_NAME, tableName, baseUrl, e); + } + } + return null; + } + + private List<String> getControllerBaseUrls() { + Set<String> urls = new LinkedHashSet<>(); + if (_helixAdmin != null) { + for (String controller : discoverControllersFromHelix()) { + String normalized = normalizeControllerUrl(controller); + if (normalized != null) { + urls.add(normalized); + } + } + } + for (String url : _staticControllerUrls) { + String normalized = normalizeControllerUrl(url); + if (normalized != null) { + urls.add(normalized); + } + } + return new ArrayList<>(urls); + } + + private int getSegmentCount(TableSize sizeFromController, TableType tableType) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + return sizeFromController._offlineSegments._segments.size(); + } + if (tableType == TableType.REALTIME && sizeFromController._realtimeSegments != null + && sizeFromController._realtimeSegments._segments != null) { + return sizeFromController._realtimeSegments._segments.size(); + } + return 0; + } + + private long getTotalDocs(TableSize sizeFromController, TableType tableType) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + return sizeFromController._offlineSegments._segments.values().stream() + .mapToLong(segmentSize -> segmentSize._totalDocs).sum(); + } + if (tableType == TableType.REALTIME && sizeFromController._realtimeSegments != null + && sizeFromController._realtimeSegments._segments != null) { + return sizeFromController._realtimeSegments._segments.values().stream() + .mapToLong(segmentSize -> segmentSize._totalDocs).sum(); + } + return 0; + } + + private @Nullable Function<String, TableSize> getSizeFetcher() { + if (_tableSizeFetcherOverride != null) { + return _tableSizeFetcherOverride; + } + List<String> controllers = getControllerBaseUrls(); + if (controllers.isEmpty() || !isAdminClientAvailable()) { + return null; + } + return tableNameWithType -> fetchWithAdminClient(controllers, tableNameWithType); + } + + private List<String> discoverControllersFromHelix() { + List<String> urls = new ArrayList<>(); + try { + if (_clusterName == null) { + LOGGER.warn("Cannot discover controllers without cluster name"); + return List.of(); + } + for (String controllerId : _helixAdmin.getInstancesInCluster(_clusterName)) { + if (!InstanceTypeUtils.isController(controllerId)) { + continue; + } + int firstUnderscore = controllerId.indexOf('_'); + int lastUnderscore = controllerId.lastIndexOf('_'); + if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && lastUnderscore < controllerId.length() - 1) { + String host = controllerId.substring(firstUnderscore + 1, lastUnderscore); + String port = controllerId.substring(lastUnderscore + 1); + if (!host.isEmpty() && !port.isEmpty()) { + urls.add(host + ":" + port); + } else { + LOGGER.warn("Unable to parse controller address from instance id (empty host/port): {}", controllerId); Review Comment: The error message is unclear—'empty host/port' suggests both are missing, but the condition checks if *either* is empty. Reword to 'empty host or port' for accuracy. ```suggestion LOGGER.warn("Unable to parse controller address from instance id (empty host or port): {}", controllerId); ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java: ########## @@ -106,7 +113,25 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption } } + Set<String> tableNames = null; + if (_systemTableBrokerRequestHandler != null) { + try { + // NOTE: compileToPinotQuery(SqlNodeAndOptions) mutates the SqlNode (e.g. SqlOrderBy -> SqlSelect). + // Re-parse from raw SQL so the SqlNodeAndOptions passed to the handler is unchanged (important for MSE). + JsonNode sql = request.get(Request.SQL); + String sqlQuery = + (sql != null && sql.isTextual()) ? sql.asText() : sqlNodeAndOptions.getSqlNode().toString(); + tableNames = RequestUtils.getTableNames(CalciteSqlParser.compileToPinotQuery(sqlQuery)); + } catch (Exception e) { + // Ignore compilation exceptions here; the selected request handler will surface them appropriately. Review Comment: The comment claims the handler will surface errors, but swallowing exceptions without logging may hide issues during debugging. Consider logging at debug level or explain in the comment why silent failure is acceptable. ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableDataProvider; +import org.apache.pinot.common.systemtable.SystemTableResponseUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTable; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.FilterKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableDataProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final long SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>(); + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); Review Comment: The 5-second connect timeout is a magic number and could vary based on deployment. Extract it as a named constant or configuration property to improve maintainability. ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableDataProvider; +import org.apache.pinot.common.systemtable.SystemTableResponseUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTable; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.FilterKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableDataProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final long SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); Review Comment: The 1-minute cache TTL may not suit all deployment scenarios. Consider making this configurable via a system property or constructor parameter for flexibility. ```suggestion private static final String SIZE_CACHE_TTL_MS_PROPERTY = "pinot.system.tables.sizeCacheTtlMs"; private static final long DEFAULT_SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); private static final long SIZE_CACHE_TTL_MS = Long.getLong(SIZE_CACHE_TTL_MS_PROPERTY, DEFAULT_SIZE_CACHE_TTL_MS); ``` ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableDataProvider; +import org.apache.pinot.common.systemtable.SystemTableResponseUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTable; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.FilterKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableDataProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final long SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); Review Comment: The hardcoded class name 'org.apache.pinot.client.admin.PinotAdminClient' appears both in the static initializer and in fetchWithAdminClient (line 586). Define it as a constant to avoid duplication and ease future refactoring. ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableDataProvider; +import org.apache.pinot.common.systemtable.SystemTableResponseUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.systemtable.SystemTable; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.FilterKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableDataProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final long SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private static final boolean IS_ADMIN_CLIENT_AVAILABLE; + static { + boolean available; + try { + Class.forName("org.apache.pinot.client.admin.PinotAdminClient"); + available = true; + } catch (ClassNotFoundException e) { + available = false; + } + IS_ADMIN_CLIENT_AVAILABLE = available; + } + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final HttpClient _httpClient; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _staticControllerUrls; + private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>(); + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build(); + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _staticControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public TableConfig getTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + } + + @Override + public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) { + List<String> projectionColumns = pinotQuery.getSelectList() != null + ? pinotQuery.getSelectList().stream().map(expr -> expr.getIdentifier().getName()).collect(Collectors.toList()) + : List.of(); + if (_tableCache == null) { + return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, projectionColumns, List.of(), 0); + } + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + int offset = Math.max(0, pinotQuery.getOffset()); + int limit = pinotQuery.getLimit(); + boolean hasLimit = limit > 0; + int totalRows = 0; + int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) : 0; + List<GenericRow> rows = new ArrayList<>(initialCapacity); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableStats stats = buildStats(tableNameWithType, tableType); + if (!matchesFilter(pinotQuery.getFilterExpression(), stats, rawTableName)) { + continue; + } + if (offset > 0) { + offset--; + totalRows++; + continue; + } + totalRows++; + if (limit == 0) { + continue; + } + if (hasLimit && rows.size() >= limit) { + continue; + } + GenericRow row = new GenericRow(); + row.putValue("tableName", rawTableName); + row.putValue("type", stats._type); + row.putValue("status", stats._status); + row.putValue("segments", stats._segments); + row.putValue("totalDocs", stats._totalDocs); + row.putValue("reportedSize", stats._reportedSizeInBytes); + row.putValue("estimatedSize", stats._estimatedSizeInBytes); + row.putValue("storageTier", stats._storageTier); + row.putValue("brokerTenant", stats._brokerTenant); + row.putValue("serverTenant", stats._serverTenant); + row.putValue("replicas", stats._replicas); + row.putValue("tableConfig", stats._tableConfig); + rows.add(row); + } + return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, projectionColumns, rows, totalRows); + } + + private TableStats buildStats(String tableNameWithType, TableType tableType) { + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + int segments = 0; + long totalDocs = 0; + long reportedSize = 0; + long estimatedSize = 0; + String tierValue = ""; + String brokerTenant = ""; + String serverTenant = ""; + int replicas = 0; + if (tableConfig != null && tableConfig.getTenantConfig() != null) { + brokerTenant = tableConfig.getTenantConfig().getBroker(); + serverTenant = tableConfig.getTenantConfig().getServer(); + } + if (tableConfig != null && tableConfig.getValidationConfig() != null) { + Integer repl = tableConfig.getValidationConfig().getReplicationNumber(); + replicas = repl != null ? repl : replicas; + } + // Use controller API only + TableSize sizeFromController = fetchTableSize(tableNameWithType); + if (sizeFromController != null) { + if (sizeFromController._reportedSizeInBytes >= 0) { + reportedSize = sizeFromController._reportedSizeInBytes; + } + if (sizeFromController._estimatedSizeInBytes >= 0) { + estimatedSize = sizeFromController._estimatedSizeInBytes; + } + segments = getSegmentCount(sizeFromController, tableType); + totalDocs = getTotalDocs(sizeFromController, tableType); + } + String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" : "UNKNOWN"); + String tableConfigJson = ""; + if (tableConfig != null) { + try { + tableConfigJson = JsonUtils.objectToString(tableConfig); + } catch (Exception e) { + LOGGER.warn("Failed to serialize table config for {}: {}", tableNameWithType, e.toString()); + tableConfigJson = tableConfig.toString(); + } + } + return new TableStats(tableType.name(), status, segments, totalDocs, reportedSize, estimatedSize, tierValue, + tableConfigJson, brokerTenant, serverTenant, replicas); + } + + private boolean matchesFilter(@Nullable org.apache.pinot.common.request.Expression filterExpression, TableStats stats, + String rawTableName) { + if (filterExpression == null) { + return true; + } + org.apache.pinot.common.request.Function function = filterExpression.getFunctionCall(); + if (function == null) { + return true; + } + FilterKind filterKind = toFilterKind(function.getOperator()); + if (filterKind == null) { + return true; + } + switch (filterKind) { + case AND: + for (org.apache.pinot.common.request.Expression child : function.getOperands()) { + if (!matchesFilter(child, stats, rawTableName)) { + return false; + } + } + return true; + case OR: + for (org.apache.pinot.common.request.Expression child : function.getOperands()) { + if (matchesFilter(child, stats, rawTableName)) { + return true; + } + } + return false; + case NOT: + if (function.getOperandsSize() == 0) { + return true; + } + return !matchesFilter(function.getOperands().get(0), stats, rawTableName); + default: + return matchesLeafFilter(filterKind, function.getOperands(), stats, rawTableName); + } + } + + private boolean matchesLeafFilter(FilterKind filterKind, + List<org.apache.pinot.common.request.Expression> operands, TableStats stats, String rawTableName) { + String column = extractIdentifier(operands); + if (column == null) { + return true; + } + List<String> values = extractLiteralValues(operands); + if (values.isEmpty()) { + return true; + } + switch (column.toLowerCase(Locale.ROOT)) { + case "tablename": + return matchesString(values, rawTableName, filterKind); + case "type": + return matchesString(values, stats._type, filterKind); + case "status": + return matchesString(values, stats._status, filterKind); + case "segments": + return matchesNumber(values, stats._segments, filterKind); + case "reportedsize": + return matchesNumber(values, stats._reportedSizeInBytes, filterKind); + case "estimatedsize": + return matchesNumber(values, stats._estimatedSizeInBytes, filterKind); + default: + return true; + } + } + + private boolean matchesString(List<String> candidates, String actual, FilterKind filterKind) { + switch (filterKind) { + case EQUALS: + case IN: + return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual)); + case NOT_EQUALS: + return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual)); + default: + return true; + } + } + + private boolean matchesNumber(List<String> candidates, long actual, FilterKind filterKind) { + try { + switch (filterKind) { + case EQUALS: + return candidates.stream().anyMatch(v -> Long.parseLong(v) == actual); + case NOT_EQUALS: + return candidates.stream().noneMatch(v -> Long.parseLong(v) == actual); + case GREATER_THAN: + return candidates.stream().anyMatch(v -> actual > Long.parseLong(v)); + case GREATER_THAN_OR_EQUAL: + return candidates.stream().anyMatch(v -> actual >= Long.parseLong(v)); + case LESS_THAN: + return candidates.stream().anyMatch(v -> actual < Long.parseLong(v)); + case LESS_THAN_OR_EQUAL: + return candidates.stream().anyMatch(v -> actual <= Long.parseLong(v)); + case IN: + for (String candidate : candidates) { + if (actual == Long.parseLong(candidate)) { + return true; + } + } + return false; + case RANGE: + case BETWEEN: + if (candidates.size() >= 2) { + long lower = Long.parseLong(candidates.get(0)); + long upper = Long.parseLong(candidates.get(1)); + return actual >= lower && actual <= upper; + } + return true; + default: + return true; + } + } catch (NumberFormatException e) { + LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, e.toString()); + return true; + } + } + + private @Nullable String extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) { + for (org.apache.pinot.common.request.Expression operand : operands) { + org.apache.pinot.common.request.Identifier identifier = operand.getIdentifier(); + if (identifier != null) { + return identifier.getName(); + } + } + return null; + } + + private List<String> extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) { + List<String> values = new ArrayList<>(); + for (org.apache.pinot.common.request.Expression operand : operands) { + org.apache.pinot.common.request.Literal literal = operand.getLiteral(); + if (literal != null) { + if (literal.getSetField() == org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) { + values.add("null"); + continue; + } + try { + values.add(RequestUtils.getLiteralString(literal)); + } catch (Exception e) { + values.add(String.valueOf(RequestUtils.getLiteralValue(literal))); + } + } + } + return values; + } + + private @Nullable FilterKind toFilterKind(String operator) { + String normalized = operator.toUpperCase(Locale.ROOT); + switch (normalized) { + case "EQ": + return FilterKind.EQUALS; + case "NEQ": + return FilterKind.NOT_EQUALS; + case "GT": + return FilterKind.GREATER_THAN; + case "GTE": + return FilterKind.GREATER_THAN_OR_EQUAL; + case "LT": + return FilterKind.LESS_THAN; + case "LTE": + return FilterKind.LESS_THAN_OR_EQUAL; + default: + try { + return FilterKind.valueOf(normalized); + } catch (Exception e) { + return null; + } + } + } + + private @Nullable TableSize fetchTableSize(String tableNameWithType) { + TableSize cached = getCachedSize(tableNameWithType); + if (cached != null) { + return cached; + } + Function<String, TableSize> fetcher = getSizeFetcher(); + if (fetcher != null) { + try { + TableSize fetched = fetcher.apply(tableNameWithType); + if (fetched != null) { + cacheSize(tableNameWithType, fetched); + return fetched; + } + } catch (Exception e) { + LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, e.toString()); + } + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableSize size = fetchTableSizeForName(rawTableName); + if (size == null) { + size = fetchTableSizeForName(tableNameWithType); + if (size == null) { + LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning null", TABLE_NAME, tableNameWithType, + rawTableName); Review Comment: The message states 'returning null' but does not explain *why* both raw and type-suffixed names failed or what the user can do. Consider adding context about potential causes (e.g., table not found, controller unreachable). ```suggestion LOGGER.warn( "{}: failed to fetch table size for {} (raw: {}); no successful response from any controller. This may " + "indicate that the table does not exist or that the controller is unreachable; returning null size", TABLE_NAME, tableNameWithType, rawTableName); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
