Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2617593762


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java:
##########
@@ -67,6 +67,15 @@ public static String translateTableName(String tableName, 
@Nullable String datab
       case 2:
         Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table 
name '%s'", tableName);
         String databasePrefix = tableSplit[0];
+        /*
+         * Allow system tables to bypass database prefix validation so they 
can be queried regardless of the database
+         * header. System tables are intended to be globally accessible and 
are not subject to database-level access
+         * controls. Ensure system tables do not expose sensitive information 
because they can be queried without a
+         * matching database context.

Review Comment:
   The comment warns about sensitive information exposure but doesn't specify 
what validation or access controls are in place. Consider documenting what 
security measures are implemented for system tables or adding a reference to 
where such controls are defined.
   ```suggestion
            * matching database context.
            *
            * Security Note: System tables are intentionally not protected by 
database-level access controls. This is by design,
            * as documented in the Pinot security model (see: 
https://docs.pinot.apache.org/basics/security or the relevant
            * access control implementation in 
org.apache.pinot.spi.security.AccessControl). If you add new system tables,
            * ensure they do not expose sensitive or restricted information.
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,598 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;

Review Comment:
   The field name `_staticControllerUrls` is ambiguous. Consider renaming to 
`_configuredControllerUrls` or `_providedControllerUrls` to clarify that these 
are explicitly configured URLs rather than URLs discovered dynamically from 
Helix.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,244 @@ private void throwAccessDeniedError(long requestId, 
String query, RequestContext
     throw new WebApplicationException("Permission denied." + failureMessage, 
Response.Status.FORBIDDEN);
   }
 
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String 
tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = _systemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      if (!isSupportedSystemTableQuery(pinotQuery)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+            "System tables only support simple projection/filter/limit 
queries");
+      }
+      Schema systemSchema = provider.getSchema();
+      List<String> projectionColumns = extractProjectionColumns(pinotQuery, 
systemSchema);
+      int offset = Math.max(0, pinotQuery.getOffset());
+      int limit = Math.max(0, pinotQuery.getLimit());
+      SystemTableRequest systemTableRequest =
+          new SystemTableRequest(projectionColumns, 
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+              limit);
+      SystemTableResponse systemTableResponse = 
provider.getRows(systemTableRequest);
+      BrokerResponseNative brokerResponse =
+          buildSystemTableBrokerResponse(tableName, systemSchema, 
projectionColumns, systemTableResponse,
+              requestContext);
+      brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, 
tableName, brokerResponse,
+          QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, null));
+      return brokerResponse;
+    } catch (BadQueryRequestException e) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while handling system table query {}: {}", 
tableName, e.getMessage(), e);
+      requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+    }
+  }
+
+  private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+    return (pinotQuery.getGroupByList() == null || 
pinotQuery.getGroupByList().isEmpty())
+        && pinotQuery.getHavingExpression() == null
+        && (pinotQuery.getOrderByList() == null || 
pinotQuery.getOrderByList().isEmpty());

Review Comment:
   The method lacks documentation explaining why GROUP BY, HAVING, and ORDER BY 
are unsupported for system tables. Add a comment explaining that these 
operations are not supported because system tables currently only support 
simple projection/filter/limit queries without aggregations or sorting.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -1033,6 +1046,244 @@ private void throwAccessDeniedError(long requestId, 
String query, RequestContext
     throw new WebApplicationException("Permission denied." + failureMessage, 
Response.Status.FORBIDDEN);
   }
 
+  private boolean isSystemTable(String tableName) {
+    return tableName != null && 
tableName.toLowerCase(Locale.ROOT).startsWith("system.");
+  }
+
+  private BrokerResponse handleSystemTableQuery(PinotQuery pinotQuery, String 
tableName,
+      RequestContext requestContext, @Nullable RequesterIdentity 
requesterIdentity, String query) {
+    if (pinotQuery.isExplain()) {
+      return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
+    }
+    SystemTableProvider provider = _systemTableRegistry.get(tableName);
+    if (provider == null) {
+      requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST);
+      return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
+    }
+    try {
+      if (!isSupportedSystemTableQuery(pinotQuery)) {
+        requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+        return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION,
+            "System tables only support simple projection/filter/limit 
queries");
+      }
+      Schema systemSchema = provider.getSchema();
+      List<String> projectionColumns = extractProjectionColumns(pinotQuery, 
systemSchema);
+      int offset = Math.max(0, pinotQuery.getOffset());
+      int limit = Math.max(0, pinotQuery.getLimit());
+      SystemTableRequest systemTableRequest =
+          new SystemTableRequest(projectionColumns, 
toSystemTableFilter(pinotQuery.getFilterExpression()), offset,
+              limit);
+      SystemTableResponse systemTableResponse = 
provider.getRows(systemTableRequest);
+      BrokerResponseNative brokerResponse =
+          buildSystemTableBrokerResponse(tableName, systemSchema, 
projectionColumns, systemTableResponse,
+              requestContext);
+      brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
+      _queryLogger.log(new QueryLogger.QueryLogParams(requestContext, 
tableName, brokerResponse,
+          QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, null));
+      return brokerResponse;
+    } catch (BadQueryRequestException e) {
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while handling system table query {}: {}", 
tableName, e.getMessage(), e);
+      requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, 
e.getMessage());
+    }
+  }
+
+  private boolean isSupportedSystemTableQuery(PinotQuery pinotQuery) {
+    return (pinotQuery.getGroupByList() == null || 
pinotQuery.getGroupByList().isEmpty())
+        && pinotQuery.getHavingExpression() == null
+        && (pinotQuery.getOrderByList() == null || 
pinotQuery.getOrderByList().isEmpty());
+  }
+
+  private List<String> extractProjectionColumns(PinotQuery pinotQuery, Schema 
schema)
+      throws BadQueryRequestException {
+    List<String> projections = new ArrayList<>();
+    boolean hasStar = false;
+    List<Expression> selectList = pinotQuery.getSelectList();
+    if (CollectionUtils.isEmpty(selectList)) {
+      throw new BadQueryRequestException("System tables require a projection 
list");
+    }
+    for (Expression expression : selectList) {
+      Identifier identifier = expression.getIdentifier();
+      if (identifier != null) {
+        if ("*".equals(identifier.getName())) {
+          hasStar = true;
+        } else {
+          projections.add(identifier.getName());
+        }
+        continue;
+      }
+      Function function = expression.getFunctionCall();
+      if (function != null && "AS".equalsIgnoreCase(function.getOperator()) && 
!function.getOperands().isEmpty()) {
+        Identifier aliased = function.getOperands().get(0).getIdentifier();
+        if (aliased != null) {
+          projections.add(aliased.getName());
+          continue;
+        }
+      }
+      throw new BadQueryRequestException("System tables only support column 
projections or '*'");
+    }
+    if (hasStar || projections.isEmpty()) {
+      projections = new ArrayList<>(schema.getColumnNames());

Review Comment:
   The condition `projections.isEmpty()` should not expand to all columns 
automatically, as an empty projection list is semantically different from 
SELECT *. Either throw a BadQueryRequestException for empty projections, or 
document this fallback behavior explicitly.
   ```suggestion
       if (hasStar) {
         projections = new ArrayList<>(schema.getColumnNames());
       } else if (projections.isEmpty()) {
         throw new BadQueryRequestException("No valid projection columns found 
in SELECT list for system table");
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,598 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.systemtable.SystemTableFilter;
+import org.apache.pinot.spi.systemtable.SystemTableProvider;
+import org.apache.pinot.spi.systemtable.SystemTableRequest;
+import org.apache.pinot.spi.systemtable.SystemTableResponse;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+public final class TablesSystemTableProvider implements SystemTableProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final boolean IS_ADMIN_CLIENT_AVAILABLE;
+  static {
+    boolean available;
+    try {
+      Class.forName("org.apache.pinot.client.admin.PinotAdminClient");
+      available = true;
+    } catch (ClassNotFoundException e) {
+      available = false;
+    }
+    IS_ADMIN_CLIENT_AVAILABLE = available;
+  }
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final HttpClient _httpClient;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _httpClient = 
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(5)).build();
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public SystemTableResponse getRows(SystemTableRequest request) {
+    if (_tableCache == null) {
+      return new SystemTableResponse(List.of(), System.currentTimeMillis(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, request.getOffset());
+    int limit = request.getLimit();
+    boolean hasLimit = limit > 0;
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: 0;
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType);
+      if (!matchesFilter(request.getFilter(), stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (offset > 0) {
+        offset--;
+        continue;
+      }
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      GenericRow row = new GenericRow();
+      row.putValue("tableName", rawTableName);
+      row.putValue("type", stats._type);
+      row.putValue("status", stats._status);
+      row.putValue("segments", stats._segments);
+      row.putValue("totalDocs", stats._totalDocs);
+      row.putValue("reportedSize", stats._reportedSizeInBytes);
+      row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+      row.putValue("storageTier", stats._storageTier);
+      row.putValue("brokerTenant", stats._brokerTenant);
+      row.putValue("serverTenant", stats._serverTenant);
+      row.putValue("replicas", stats._replicas);
+      row.putValue("tableConfig", stats._tableConfig);
+      rows.add(row);
+    }
+    return new SystemTableResponse(rows, System.currentTimeMillis(), 
totalRows);
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType) 
{
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = tableConfig != null ? "ONLINE" : (segments > 0 ? "ONLINE" 
: "UNKNOWN");
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(SystemTableFilter filter, TableStats stats, 
String rawTableName) {
+    if (filter == null) {
+      return true;
+    }
+    if (filter.getChildren().isEmpty()) {
+      return matchesLeafFilter(filter, stats, rawTableName);
+    }
+    switch (filter.getOperator()) {
+      case AND:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (SystemTableFilter child : filter.getChildren()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        return filter.getChildren().isEmpty() || 
!matchesFilter(filter.getChildren().get(0), stats, rawTableName);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesLeafFilter(SystemTableFilter filter, TableStats 
stats, String rawTableName) {
+    String column = filter.getColumn();
+    if (column == null) {
+      return true;
+    }
+    List<String> values = filter.getValues();
+    if (values == null || values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase()) {
+      case "tablename":
+        return matchesString(values, rawTableName, filter.getOperator());
+      case "type":
+        return matchesString(values, stats._type, filter.getOperator());
+      case "status":
+        return matchesString(values, stats._status, filter.getOperator());
+      case "segments":
+        return matchesNumber(values, stats._segments, filter.getOperator());
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, 
filter.getOperator());
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, 
filter.getOperator());
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
SystemTableFilter.Operator operator) {
+    switch (operator) {
+      case EQ:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NEQ:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
SystemTableFilter.Operator operator) {
+    try {
+      switch (operator) {
+        case EQ:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NEQ:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GT:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GTE:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LT:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LTE:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType) {
+    Function<String, TableSize> fetcher = getSizeFetcher();
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(tableNameWithType);
+      if (size == null) {
+        LOGGER.warn("{}: failed to fetch size for {} (raw: {}), returning 
null", TABLE_NAME, tableNameWithType,
+            rawTableName);
+      }
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(String tableName) {
+    for (String baseUrl : getControllerBaseUrls()) {
+      try {
+        String url = baseUrl + "/tables/" + tableName + 
"/size?verbose=true&includeReplacedSegments=false";
+        HttpRequest request = HttpRequest.newBuilder(URI.create(url))
+            .timeout(Duration.ofSeconds(5))
+            .GET()
+            .header("Accept", "application/json")
+            .build();
+        HttpResponse<String> response = _httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+        if (response.statusCode() >= 200 && response.statusCode() < 300) {
+          TableSize parsed = JsonUtils.stringToObject(response.body(), 
TableSize.class);
+          LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                  + "reportedSize={}, estimatedSize={}", TABLE_NAME, 
tableName, baseUrl,
+              parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                  ? parsed._offlineSegments._segments.size() : 0,
+              parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                  ? parsed._realtimeSegments._segments.size() : 0,
+              parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+          return parsed;
+        } else {
+          LOGGER.warn("{}: failed to fetch table size for {} via {}: status 
{}, body={}", TABLE_NAME, tableName,
+              baseUrl, response.statusCode(), response.body());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("{}: error fetching table size for {} via {}", TABLE_NAME, 
tableName, baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType) 
{
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private @Nullable Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    List<String> controllers = getControllerBaseUrls();
+    if (controllers.isEmpty() || !isAdminClientAvailable()) {
+      return null;
+    }
+    return tableNameWithType -> fetchWithAdminClient(controllers, 
tableNameWithType);
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int idx = controllerId.lastIndexOf('_');
+        if (idx > 0 && idx < controllerId.length() - 1) {
+          String host = controllerId.substring(controllerId.indexOf('_') + 1, 
idx);
+          String port = controllerId.substring(idx + 1);

Review Comment:
   The substring extraction uses `controllerId.indexOf('_')` which finds the 
first underscore, but `idx` is set to `lastIndexOf('_')`. This mismatch could 
extract an incorrect host string when multiple underscores are present. Both 
should use either `indexOf` or a different parsing strategy that accounts for 
the expected format.
   ```suggestion
           String[] parts = controllerId.split("_");
           if (parts.length >= 3) {
             // Join all parts between the first and last as host (to handle 
underscores in host)
             StringBuilder hostBuilder = new StringBuilder();
             for (int i = 1; i < parts.length - 1; i++) {
               if (i > 1) {
                 hostBuilder.append("_");
               }
               hostBuilder.append(parts[i]);
             }
             String host = hostBuilder.toString();
             String port = parts[parts.length - 1];
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to