Copilot commented on code in PR #17291:
URL: https://github.com/apache/pinot/pull/17291#discussion_r2643405446


##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/InstancesSystemTableProvider.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/**
+ * Basic system table exposing Pinot instance metadata.
+ */
+@SystemTable
+public final class InstancesSystemTableProvider implements 
SystemTableDataProvider {
+  public static final String TABLE_NAME = "system.instances";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("instanceId", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("host", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("port", FieldSpec.DataType.INT)
+      .addSingleValueDimension("state", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("tags", FieldSpec.DataType.STRING)
+      .build();
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    // Controller/Helix fetch can be added later; for now return an empty 
result.
+    List<String> projectionColumns = pinotQuery.getSelectList() != null
+        ? pinotQuery.getSelectList().stream().map(expr -> 
expr.getIdentifier().getName()).collect(Collectors.toList())

Review Comment:
   The code assumes all select expressions have identifiers without null 
checks. If `expr.getIdentifier()` returns null, this will throw a 
NullPointerException. Add null checks or use the same pattern as 
`TablesSystemTableProvider.getProjectionColumns` which handles this case 
properly.
   ```suggestion
           ? pinotQuery.getSelectList().stream()
               .map(expr -> expr.getIdentifier())
               .filter(identifier -> identifier != null)
               .map(identifier -> identifier.getName())
               .collect(Collectors.toList())
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -106,7 +115,27 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable SqlNodeAndOption
       }
     }
 
+    Set<String> tableNames = null;
+    if (_systemTableBrokerRequestHandler != null) {
+      try {
+        // NOTE: compileToPinotQuery(SqlNodeAndOptions) mutates the SqlNode 
(e.g. SqlOrderBy -> SqlSelect).
+        // Re-parse from raw SQL so the SqlNodeAndOptions passed to the 
handler is unchanged (important for
+        // Multi-Stage Engine (MSE)).
+        JsonNode sql = request.get(Request.SQL);
+        String sqlQuery =
+            (sql != null && sql.isTextual()) ? sql.asText() : 
sqlNodeAndOptions.getSqlNode().toString();
+        tableNames = 
RequestUtils.getTableNames(CalciteSqlParser.compileToPinotQuery(sqlQuery));

Review Comment:
   If `sqlNodeAndOptions` is null (as indicated by the `@Nullable` annotation 
on the parameter), calling `getSqlNode()` will throw a NullPointerException. 
Add a null check for `sqlNodeAndOptions` before accessing `getSqlNode()`.
   ```suggestion
           String sqlQuery;
           if (sql != null && sql.isTextual()) {
             sqlQuery = sql.asText();
           } else if (sqlNodeAndOptions != null) {
             sqlQuery = sqlNodeAndOptions.getSqlNode().toString();
           } else {
             // If both the raw SQL and SqlNodeAndOptions are unavailable, skip 
determining table names.
             sqlQuery = null;
           }
           if (sqlQuery != null) {
             tableNames = 
RequestUtils.getTableNames(CalciteSqlParser.compileToPinotQuery(sqlQuery));
           }
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,850 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  private static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final @Nullable Class<?> PINOT_ADMIN_CLIENT_CLASS = 
loadAdminClientClass();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, AutoCloseable> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicBoolean _loggedSizeFetchFailure = new AtomicBoolean();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, AutoCloseable> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    List<String> projectionColumns = getProjectionColumns(pinotQuery);
+    if (_tableCache == null) {
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, List.of(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, pinotQuery.getOffset());
+    int limit = pinotQuery.getLimit();
+    boolean hasLimit = limit > 0;
+    org.apache.pinot.common.request.Expression filterExpression = 
pinotQuery.getFilterExpression();
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+
+    if (filterExpression == null) {
+      int totalRows = sortedTableNames.size();
+      int startIndex = Math.min(offset, totalRows);
+      int endIndex = totalRows;
+      if (limit == 0) {
+        endIndex = startIndex;
+      } else if (hasLimit) {
+        endIndex = Math.min(totalRows, startIndex + limit);
+      }
+      List<GenericRow> rows = new ArrayList<>(Math.max(0, endIndex - 
startIndex));
+      for (int i = startIndex; i < endIndex; i++) {
+        String tableNameWithType = sortedTableNames.get(i);
+        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+        if (tableType == null) {
+          continue;
+        }
+        String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+        TableStats stats = buildStats(tableNameWithType, tableType, 
sizeFetcher, controllerBaseUrls);
+        rows.add(buildRow(rawTableName, stats));
+      }
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+    }
+
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: sortedTableNames.size();
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType, sizeFetcher, 
controllerBaseUrls);
+      if (!matchesFilter(filterExpression, stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (totalRows <= offset) {
+        continue;
+      }
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      rows.add(buildRow(rawTableName, stats));
+    }
+    return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+  }
+
+  private GenericRow buildRow(String rawTableName, TableStats stats) {
+    GenericRow row = new GenericRow();
+    row.putValue("tableName", rawTableName);
+    row.putValue("type", stats._type);
+    row.putValue("status", stats._status);
+    row.putValue("segments", stats._segments);
+    row.putValue("totalDocs", stats._totalDocs);
+    row.putValue("reportedSize", stats._reportedSizeInBytes);
+    row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+    row.putValue("storageTier", stats._storageTier);
+    row.putValue("brokerTenant", stats._brokerTenant);
+    row.putValue("serverTenant", stats._serverTenant);
+    row.putValue("replicas", stats._replicas);
+    row.putValue("tableConfig", stats._tableConfig);
+    return row;
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType,
+      @Nullable Function<String, TableSize> tableSizeFetcher, List<String> 
controllerBaseUrls) {
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType, 
tableSizeFetcher, controllerBaseUrls);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = (tableConfig != null || segments > 0) ? "ONLINE" : 
"UNKNOWN";
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(@Nullable 
org.apache.pinot.common.request.Expression filterExpression, TableStats stats,
+      String rawTableName) {
+    if (filterExpression == null) {
+      return true;
+    }
+    org.apache.pinot.common.request.Function function = 
filterExpression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    FilterKind filterKind = toFilterKind(function.getOperator());
+    if (filterKind == null) {
+      return true;
+    }
+    switch (filterKind) {
+      case AND:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        if (function.getOperandsSize() == 0) {
+          return true;
+        }
+        return !matchesFilter(function.getOperands().get(0), stats, 
rawTableName);
+      default:
+        return matchesLeafFilter(filterKind, function.getOperands(), stats, 
rawTableName);
+    }
+  }
+
+  private boolean matchesLeafFilter(FilterKind filterKind,
+      List<org.apache.pinot.common.request.Expression> operands, TableStats 
stats, String rawTableName) {
+    String column = extractIdentifier(operands);
+    if (column == null) {
+      return true;
+    }
+    List<String> values = extractLiteralValues(operands);
+    if (values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase(Locale.ROOT)) {
+      case "tablename":
+        return matchesString(values, rawTableName, filterKind);
+      case "type":
+        return matchesString(values, stats._type, filterKind);
+      case "status":
+        return matchesString(values, stats._status, filterKind);
+      case "segments":
+        return matchesNumber(values, stats._segments, filterKind);
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, filterKind);
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, filterKind);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
FilterKind filterKind) {
+    switch (filterKind) {
+      case EQUALS:
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NOT_EQUALS:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
FilterKind filterKind) {
+    try {
+      switch (filterKind) {
+        case EQUALS:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NOT_EQUALS:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GREATER_THAN:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GREATER_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LESS_THAN:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LESS_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        case RANGE:
+        case BETWEEN:
+          if (candidates.size() >= 2) {
+            long lower = Long.parseLong(candidates.get(0));
+            long upper = Long.parseLong(candidates.get(1));
+            return actual >= lower && actual <= upper;
+          }
+          return true;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable String 
extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) {
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Identifier identifier = 
operand.getIdentifier();
+      if (identifier != null) {
+        return identifier.getName();
+      }
+    }
+    return null;
+  }
+
+  private List<String> 
extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) 
{
+    List<String> values = new ArrayList<>();
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Literal literal = operand.getLiteral();
+      if (literal != null) {
+        if (literal.getSetField() == 
org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) {
+          values.add("null");
+          continue;
+        }
+        try {
+          values.add(RequestUtils.getLiteralString(literal));
+        } catch (Exception e) {
+          values.add(String.valueOf(RequestUtils.getLiteralValue(literal)));
+        }
+      }
+    }
+    return values;
+  }
+
+  private @Nullable FilterKind toFilterKind(String operator) {
+    String normalized = operator.toUpperCase(Locale.ROOT);
+    switch (normalized) {
+      case "EQ":
+        return FilterKind.EQUALS;
+      case "NEQ":
+        return FilterKind.NOT_EQUALS;
+      case "GT":
+        return FilterKind.GREATER_THAN;
+      case "GTE":
+        return FilterKind.GREATER_THAN_OR_EQUAL;
+      case "LT":
+        return FilterKind.LESS_THAN;
+      case "LTE":
+        return FilterKind.LESS_THAN_OR_EQUAL;
+      default:
+        try {
+          return FilterKind.valueOf(normalized);
+        } catch (Exception e) {
+          return null;
+        }
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType,
+      @Nullable Function<String, TableSize> fetcher, List<String> 
controllerBaseUrls) {
+    boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0;
+    TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null;
+    if (cached != null) {
+      return cached;
+    }
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          if (cacheEnabled) {
+            cacheSize(tableNameWithType, fetched);
+          }
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType);
+      if (size == null) {
+        logSizeFetchFailure("{}: failed to fetch size for {} from controllers 
{} (tried '{}' and '{}')", TABLE_NAME,
+            tableNameWithType, controllerBaseUrls, rawTableName, 
tableNameWithType);
+      }

Review Comment:
   The log message format references both `rawTableName` and 
`tableNameWithType` but only tries `tableNameWithType` in this branch (line 
487). The message says 'tried ... and ...' but only one attempt was made. This 
could confuse someone debugging. Consider revising the message to accurately 
reflect the attempts made in this specific code path.



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,850 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  private static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final @Nullable Class<?> PINOT_ADMIN_CLIENT_CLASS = 
loadAdminClientClass();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, AutoCloseable> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicBoolean _loggedSizeFetchFailure = new AtomicBoolean();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, AutoCloseable> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    List<String> projectionColumns = getProjectionColumns(pinotQuery);
+    if (_tableCache == null) {
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, List.of(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, pinotQuery.getOffset());
+    int limit = pinotQuery.getLimit();
+    boolean hasLimit = limit > 0;
+    org.apache.pinot.common.request.Expression filterExpression = 
pinotQuery.getFilterExpression();
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+
+    if (filterExpression == null) {
+      int totalRows = sortedTableNames.size();
+      int startIndex = Math.min(offset, totalRows);
+      int endIndex = totalRows;
+      if (limit == 0) {
+        endIndex = startIndex;
+      } else if (hasLimit) {
+        endIndex = Math.min(totalRows, startIndex + limit);
+      }
+      List<GenericRow> rows = new ArrayList<>(Math.max(0, endIndex - 
startIndex));
+      for (int i = startIndex; i < endIndex; i++) {
+        String tableNameWithType = sortedTableNames.get(i);
+        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+        if (tableType == null) {
+          continue;
+        }
+        String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+        TableStats stats = buildStats(tableNameWithType, tableType, 
sizeFetcher, controllerBaseUrls);
+        rows.add(buildRow(rawTableName, stats));
+      }
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+    }
+
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: sortedTableNames.size();
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType, sizeFetcher, 
controllerBaseUrls);
+      if (!matchesFilter(filterExpression, stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (totalRows <= offset) {
+        continue;
+      }
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      rows.add(buildRow(rawTableName, stats));
+    }
+    return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+  }
+
+  private GenericRow buildRow(String rawTableName, TableStats stats) {
+    GenericRow row = new GenericRow();
+    row.putValue("tableName", rawTableName);
+    row.putValue("type", stats._type);
+    row.putValue("status", stats._status);
+    row.putValue("segments", stats._segments);
+    row.putValue("totalDocs", stats._totalDocs);
+    row.putValue("reportedSize", stats._reportedSizeInBytes);
+    row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+    row.putValue("storageTier", stats._storageTier);
+    row.putValue("brokerTenant", stats._brokerTenant);
+    row.putValue("serverTenant", stats._serverTenant);
+    row.putValue("replicas", stats._replicas);
+    row.putValue("tableConfig", stats._tableConfig);
+    return row;
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType,
+      @Nullable Function<String, TableSize> tableSizeFetcher, List<String> 
controllerBaseUrls) {
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType, 
tableSizeFetcher, controllerBaseUrls);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = (tableConfig != null || segments > 0) ? "ONLINE" : 
"UNKNOWN";
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(@Nullable 
org.apache.pinot.common.request.Expression filterExpression, TableStats stats,
+      String rawTableName) {
+    if (filterExpression == null) {
+      return true;
+    }
+    org.apache.pinot.common.request.Function function = 
filterExpression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    FilterKind filterKind = toFilterKind(function.getOperator());
+    if (filterKind == null) {
+      return true;
+    }
+    switch (filterKind) {
+      case AND:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        if (function.getOperandsSize() == 0) {
+          return true;
+        }
+        return !matchesFilter(function.getOperands().get(0), stats, 
rawTableName);
+      default:
+        return matchesLeafFilter(filterKind, function.getOperands(), stats, 
rawTableName);
+    }
+  }
+
+  private boolean matchesLeafFilter(FilterKind filterKind,
+      List<org.apache.pinot.common.request.Expression> operands, TableStats 
stats, String rawTableName) {
+    String column = extractIdentifier(operands);
+    if (column == null) {
+      return true;
+    }
+    List<String> values = extractLiteralValues(operands);
+    if (values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase(Locale.ROOT)) {
+      case "tablename":
+        return matchesString(values, rawTableName, filterKind);
+      case "type":
+        return matchesString(values, stats._type, filterKind);
+      case "status":
+        return matchesString(values, stats._status, filterKind);
+      case "segments":
+        return matchesNumber(values, stats._segments, filterKind);
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, filterKind);
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, filterKind);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
FilterKind filterKind) {
+    switch (filterKind) {
+      case EQUALS:
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NOT_EQUALS:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
FilterKind filterKind) {
+    try {
+      switch (filterKind) {
+        case EQUALS:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NOT_EQUALS:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GREATER_THAN:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GREATER_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LESS_THAN:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LESS_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        case RANGE:
+        case BETWEEN:
+          if (candidates.size() >= 2) {
+            long lower = Long.parseLong(candidates.get(0));
+            long upper = Long.parseLong(candidates.get(1));
+            return actual >= lower && actual <= upper;
+          }
+          return true;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable String 
extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) {
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Identifier identifier = 
operand.getIdentifier();
+      if (identifier != null) {
+        return identifier.getName();
+      }
+    }
+    return null;
+  }
+
+  private List<String> 
extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) 
{
+    List<String> values = new ArrayList<>();
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Literal literal = operand.getLiteral();
+      if (literal != null) {
+        if (literal.getSetField() == 
org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) {
+          values.add("null");
+          continue;
+        }
+        try {
+          values.add(RequestUtils.getLiteralString(literal));
+        } catch (Exception e) {
+          values.add(String.valueOf(RequestUtils.getLiteralValue(literal)));
+        }
+      }
+    }
+    return values;
+  }
+
+  private @Nullable FilterKind toFilterKind(String operator) {
+    String normalized = operator.toUpperCase(Locale.ROOT);
+    switch (normalized) {
+      case "EQ":
+        return FilterKind.EQUALS;
+      case "NEQ":
+        return FilterKind.NOT_EQUALS;
+      case "GT":
+        return FilterKind.GREATER_THAN;
+      case "GTE":
+        return FilterKind.GREATER_THAN_OR_EQUAL;
+      case "LT":
+        return FilterKind.LESS_THAN;
+      case "LTE":
+        return FilterKind.LESS_THAN_OR_EQUAL;
+      default:
+        try {
+          return FilterKind.valueOf(normalized);
+        } catch (Exception e) {
+          return null;
+        }
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType,
+      @Nullable Function<String, TableSize> fetcher, List<String> 
controllerBaseUrls) {
+    boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0;
+    TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null;
+    if (cached != null) {
+      return cached;
+    }
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          if (cacheEnabled) {
+            cacheSize(tableNameWithType, fetched);
+          }
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType);
+      if (size == null) {
+        logSizeFetchFailure("{}: failed to fetch size for {} from controllers 
{} (tried '{}' and '{}')", TABLE_NAME,
+            tableNameWithType, controllerBaseUrls, rawTableName, 
tableNameWithType);
+      }
+    }
+    if (size != null && cacheEnabled) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(List<String> 
controllerBaseUrls, String tableName) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+
+        Object sizeNode;
+        try {
+          sizeNode = clientClass.getMethod("getTableSize", String.class, 
boolean.class, boolean.class)
+              .invoke(adminClient, tableName, true, false);
+        } catch (NoSuchMethodException e) {
+          Object tableClient = 
clientClass.getMethod("getTableClient").invoke(adminClient);
+          sizeNode = tableClient.getClass().getMethod("getTableSize", 
String.class, boolean.class, boolean.class)
+              .invoke(tableClient, tableName, true, false);
+        }
+
+        if (sizeNode == null) {
+          continue;
+        }
+
+        TableSize parsed = JsonUtils.stringToObject(sizeNode.toString(), 
TableSize.class);
+        LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, 
baseUrl,
+            parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                ? parsed._offlineSegments._segments.size() : 0,
+            parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                ? parsed._realtimeSegments._segments.size() : 0,
+            parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+        return parsed;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching table size for {} via {} using 
admin client", TABLE_NAME, tableName,
+            baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType) 
{
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private @Nullable Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    return null;
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int firstUnderscore = controllerId.indexOf('_');
+        int lastUnderscore = controllerId.lastIndexOf('_');
+        if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && 
lastUnderscore < controllerId.length() - 1) {
+          String host = controllerId.substring(firstUnderscore + 1, 
lastUnderscore);
+          String port = controllerId.substring(lastUnderscore + 1);

Review Comment:
   The controller ID parsing logic duplicates the expected format 
'Controller_<host>_<port>' in both code (substring operations) and warning 
messages (line 617). Consider extracting this parsing logic into a separate 
utility method with clear documentation of the expected format. This would 
improve testability and make the format expectation more explicit.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -106,7 +115,27 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable SqlNodeAndOption
       }
     }
 
+    Set<String> tableNames = null;
+    if (_systemTableBrokerRequestHandler != null) {
+      try {
+        // NOTE: compileToPinotQuery(SqlNodeAndOptions) mutates the SqlNode 
(e.g. SqlOrderBy -> SqlSelect).
+        // Re-parse from raw SQL so the SqlNodeAndOptions passed to the 
handler is unchanged (important for
+        // Multi-Stage Engine (MSE)).
+        JsonNode sql = request.get(Request.SQL);
+        String sqlQuery =
+            (sql != null && sql.isTextual()) ? sql.asText() : 
sqlNodeAndOptions.getSqlNode().toString();
+        tableNames = 
RequestUtils.getTableNames(CalciteSqlParser.compileToPinotQuery(sqlQuery));
+      } catch (Exception e) {
+        // Ignore compilation exceptions here; the selected request handler 
will surface them appropriately.
+        LOGGER.debug("Failed to compile query while determining request 
handler", e);
+      }

Review Comment:
   The null check for `_systemTableBrokerRequestHandler` happens here but at 
lines 134-137 it's checked again. However, the handler is used at line 74 and 
86 in `start()` and `shutDown()` without null checks. If the handler can be 
null, these methods will throw NullPointerException. Either make the handler 
non-nullable in the constructor or add null checks in the lifecycle methods.
   ```suggestion
       try {
         // NOTE: compileToPinotQuery(SqlNodeAndOptions) mutates the SqlNode 
(e.g. SqlOrderBy -> SqlSelect).
         // Re-parse from raw SQL so the SqlNodeAndOptions passed to the 
handler is unchanged (important for
         // Multi-Stage Engine (MSE)).
         JsonNode sql = request.get(Request.SQL);
         String sqlQuery =
             (sql != null && sql.isTextual()) ? sql.asText() : 
sqlNodeAndOptions.getSqlNode().toString();
         tableNames = 
RequestUtils.getTableNames(CalciteSqlParser.compileToPinotQuery(sqlQuery));
       } catch (Exception e) {
         // Ignore compilation exceptions here; the selected request handler 
will surface them appropriately.
         LOGGER.debug("Failed to compile query while determining request 
handler", e);
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/SystemTableRegistry.java:
##########
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Registry to hold and lifecycle-manage system table data providers.
+ */
+public final class SystemTableRegistry {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SystemTableRegistry.class);
+
+  // Providers are registered once at broker startup.
+  private static final Map<String, SystemTableDataProvider> PROVIDERS = new 
HashMap<>();
+
+  private SystemTableRegistry() {
+    throw new UnsupportedOperationException("This is a utility class and 
cannot be instantiated");
+  }
+
+  public static void register(SystemTableDataProvider provider) {
+    synchronized (PROVIDERS) {
+      PROVIDERS.put(normalize(provider.getTableName()), provider);
+    }
+  }
+
+  @Nullable
+  public static SystemTableDataProvider get(String tableName) {
+    synchronized (PROVIDERS) {
+      return PROVIDERS.get(normalize(tableName));
+    }
+  }
+
+  public static boolean isRegistered(String tableName) {
+    synchronized (PROVIDERS) {
+      return PROVIDERS.containsKey(normalize(tableName));
+    }
+  }
+
+  public static Collection<SystemTableDataProvider> getProviders() {
+    synchronized (PROVIDERS) {
+      return Collections.unmodifiableCollection(new 
java.util.ArrayList<>(PROVIDERS.values()));
+    }
+  }
+
+  /**
+   * Discover and register providers marked with {@link SystemTable} using the 
available dependencies.
+   * Follows the ScalarFunction pattern: any class annotated with @SystemTable 
under a "*.systemtable.*" package
+   * will be discovered via reflection and registered.
+   */
+  public static void registerAnnotatedProviders(TableCache tableCache, 
HelixAdmin helixAdmin, String clusterName) {
+    Set<Class<?>> classes =
+        
PinotReflectionUtils.getClassesThroughReflection(".*\\.systemtable\\..*", 
SystemTable.class);
+    for (Class<?> clazz : classes) {
+      if (!SystemTableDataProvider.class.isAssignableFrom(clazz)) {
+        continue;
+      }
+      SystemTableDataProvider provider = instantiateProvider(
+          clazz.asSubclass(SystemTableDataProvider.class), tableCache, 
helixAdmin, clusterName);
+      if (provider == null) {
+        continue;
+      }
+      if (isRegistered(provider.getTableName())) {
+        continue;
+      }
+      LOGGER.info("Registering system table provider: {}", 
provider.getTableName());
+      register(provider);
+    }
+  }
+
+  public static void close()
+      throws Exception {
+    Exception firstException = null;
+    // Snapshot providers to avoid concurrent modifications and to close each 
provider at most once.
+    Map<SystemTableDataProvider, Boolean> providersToClose = new 
IdentityHashMap<>();
+    synchronized (PROVIDERS) {
+      for (SystemTableDataProvider provider : PROVIDERS.values()) {
+        providersToClose.put(provider, Boolean.TRUE);
+      }
+    }
+    try {
+      for (SystemTableDataProvider provider : providersToClose.keySet()) {
+        try {
+          provider.close();
+        } catch (Exception e) {
+          if (firstException == null) {
+            firstException = e;
+          } else {
+            firstException.addSuppressed(e);
+          }
+        }
+      }
+    } finally {
+      synchronized (PROVIDERS) {
+        PROVIDERS.clear();
+      }
+    }
+    if (firstException != null) {
+      throw firstException;
+    }
+  }
+
+  /**
+   * Initialize and register all annotated system table providers.
+   */
+  public static void init(TableCache tableCache, HelixAdmin helixAdmin, String 
clusterName) {
+    registerAnnotatedProviders(tableCache, helixAdmin, clusterName);
+  }
+
+  private static String normalize(String tableName) {
+    return tableName.toLowerCase(Locale.ROOT);
+  }
+
+  private static @Nullable SystemTableDataProvider instantiateProvider(Class<? 
extends SystemTableDataProvider> clazz,
+      TableCache tableCache, HelixAdmin helixAdmin, String clusterName) {
+    try {
+      // Prefer the most specific constructor available.
+      try {
+        return clazz.getDeclaredConstructor(TableCache.class, 
HelixAdmin.class, String.class)
+            .newInstance(tableCache, helixAdmin, clusterName);
+      } catch (NoSuchMethodException ignored) {
+        // fall through
+      }
+      try {
+        return clazz.getDeclaredConstructor(TableCache.class, 
HelixAdmin.class).newInstance(tableCache, helixAdmin);
+      } catch (NoSuchMethodException ignored) {
+        // fall through
+      }
+      try {
+        return 
clazz.getDeclaredConstructor(TableCache.class).newInstance(tableCache);
+      } catch (NoSuchMethodException ignored) {
+        // fall through
+      }
+      return clazz.getDeclaredConstructor().newInstance();

Review Comment:
   Using `getDeclaredConstructor()` and `newInstance()` can bypass access 
controls on private constructors. This could be a security concern if system 
table providers are expected to control instantiation. Consider documenting 
that system table provider constructors should be public or use 
`getConstructor()` instead of `getDeclaredConstructor()` to enforce public 
constructors.
   ```suggestion
           return clazz.getConstructor(TableCache.class, HelixAdmin.class, 
String.class)
               .newInstance(tableCache, helixAdmin, clusterName);
         } catch (NoSuchMethodException ignored) {
           // fall through
         }
         try {
           return clazz.getConstructor(TableCache.class, 
HelixAdmin.class).newInstance(tableCache, helixAdmin);
         } catch (NoSuchMethodException ignored) {
           // fall through
         }
         try {
           return 
clazz.getConstructor(TableCache.class).newInstance(tableCache);
         } catch (NoSuchMethodException ignored) {
           // fall through
         }
         return clazz.getConstructor().newInstance();
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,850 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  private static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final @Nullable Class<?> PINOT_ADMIN_CLIENT_CLASS = 
loadAdminClientClass();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, AutoCloseable> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicBoolean _loggedSizeFetchFailure = new AtomicBoolean();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, AutoCloseable> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    List<String> projectionColumns = getProjectionColumns(pinotQuery);
+    if (_tableCache == null) {
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, List.of(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, pinotQuery.getOffset());
+    int limit = pinotQuery.getLimit();
+    boolean hasLimit = limit > 0;
+    org.apache.pinot.common.request.Expression filterExpression = 
pinotQuery.getFilterExpression();
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+
+    if (filterExpression == null) {
+      int totalRows = sortedTableNames.size();
+      int startIndex = Math.min(offset, totalRows);
+      int endIndex = totalRows;
+      if (limit == 0) {
+        endIndex = startIndex;
+      } else if (hasLimit) {
+        endIndex = Math.min(totalRows, startIndex + limit);
+      }
+      List<GenericRow> rows = new ArrayList<>(Math.max(0, endIndex - 
startIndex));
+      for (int i = startIndex; i < endIndex; i++) {
+        String tableNameWithType = sortedTableNames.get(i);
+        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+        if (tableType == null) {
+          continue;
+        }
+        String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+        TableStats stats = buildStats(tableNameWithType, tableType, 
sizeFetcher, controllerBaseUrls);
+        rows.add(buildRow(rawTableName, stats));
+      }
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+    }
+
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: sortedTableNames.size();
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType, sizeFetcher, 
controllerBaseUrls);
+      if (!matchesFilter(filterExpression, stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (totalRows <= offset) {
+        continue;
+      }
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      rows.add(buildRow(rawTableName, stats));
+    }
+    return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+  }
+
+  private GenericRow buildRow(String rawTableName, TableStats stats) {
+    GenericRow row = new GenericRow();
+    row.putValue("tableName", rawTableName);
+    row.putValue("type", stats._type);
+    row.putValue("status", stats._status);
+    row.putValue("segments", stats._segments);
+    row.putValue("totalDocs", stats._totalDocs);
+    row.putValue("reportedSize", stats._reportedSizeInBytes);
+    row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+    row.putValue("storageTier", stats._storageTier);
+    row.putValue("brokerTenant", stats._brokerTenant);
+    row.putValue("serverTenant", stats._serverTenant);
+    row.putValue("replicas", stats._replicas);
+    row.putValue("tableConfig", stats._tableConfig);
+    return row;
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType,
+      @Nullable Function<String, TableSize> tableSizeFetcher, List<String> 
controllerBaseUrls) {
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType, 
tableSizeFetcher, controllerBaseUrls);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = (tableConfig != null || segments > 0) ? "ONLINE" : 
"UNKNOWN";
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(@Nullable 
org.apache.pinot.common.request.Expression filterExpression, TableStats stats,
+      String rawTableName) {
+    if (filterExpression == null) {
+      return true;
+    }
+    org.apache.pinot.common.request.Function function = 
filterExpression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    FilterKind filterKind = toFilterKind(function.getOperator());
+    if (filterKind == null) {
+      return true;
+    }
+    switch (filterKind) {
+      case AND:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        if (function.getOperandsSize() == 0) {
+          return true;
+        }
+        return !matchesFilter(function.getOperands().get(0), stats, 
rawTableName);
+      default:
+        return matchesLeafFilter(filterKind, function.getOperands(), stats, 
rawTableName);
+    }
+  }
+
+  private boolean matchesLeafFilter(FilterKind filterKind,
+      List<org.apache.pinot.common.request.Expression> operands, TableStats 
stats, String rawTableName) {
+    String column = extractIdentifier(operands);
+    if (column == null) {
+      return true;
+    }
+    List<String> values = extractLiteralValues(operands);
+    if (values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase(Locale.ROOT)) {
+      case "tablename":
+        return matchesString(values, rawTableName, filterKind);
+      case "type":
+        return matchesString(values, stats._type, filterKind);
+      case "status":
+        return matchesString(values, stats._status, filterKind);
+      case "segments":
+        return matchesNumber(values, stats._segments, filterKind);
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, filterKind);
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, filterKind);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
FilterKind filterKind) {
+    switch (filterKind) {
+      case EQUALS:
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NOT_EQUALS:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
FilterKind filterKind) {
+    try {
+      switch (filterKind) {
+        case EQUALS:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NOT_EQUALS:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GREATER_THAN:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GREATER_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LESS_THAN:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LESS_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        case RANGE:
+        case BETWEEN:
+          if (candidates.size() >= 2) {
+            long lower = Long.parseLong(candidates.get(0));
+            long upper = Long.parseLong(candidates.get(1));
+            return actual >= lower && actual <= upper;
+          }
+          return true;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable String 
extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) {
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Identifier identifier = 
operand.getIdentifier();
+      if (identifier != null) {
+        return identifier.getName();
+      }
+    }
+    return null;
+  }
+
+  private List<String> 
extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) 
{
+    List<String> values = new ArrayList<>();
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Literal literal = operand.getLiteral();
+      if (literal != null) {
+        if (literal.getSetField() == 
org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) {
+          values.add("null");

Review Comment:
   The special handling of null literals by converting them to the string 
\"null\" is inconsistent with how filter matching works. The filter matching 
logic does string comparisons and numeric parsing but doesn't have special 
handling for the literal string \"null\". This could lead to unexpected 
behavior. Consider documenting this behavior or handling null literals 
consistently throughout the filter evaluation logic.
   ```suggestion
             // Skip NULL_VALUE literals instead of treating them as the string 
"null"
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/systemtable/provider/TablesSystemTableProvider.java:
##########
@@ -0,0 +1,850 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.systemtable.provider;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.systemtable.SystemTableDataProvider;
+import org.apache.pinot.common.systemtable.SystemTableResponseUtils;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.systemtable.SystemTable;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Basic system table exposing table-level metadata populated from the broker 
{@link TableCache}.
+ */
+@SystemTable
+public final class TablesSystemTableProvider implements 
SystemTableDataProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesSystemTableProvider.class);
+  public static final String TABLE_NAME = "system.tables";
+  private static final String PINOT_ADMIN_CLIENT_CLASS_NAME = 
"org.apache.pinot.client.admin.PinotAdminClient";
+  private static final String SIZE_CACHE_TTL_MS_PROPERTY = 
"pinot.systemtable.tables.sizeCacheTtlMs";
+  private static final long DEFAULT_SIZE_CACHE_TTL_MS = 
Duration.ofMinutes(1).toMillis();
+  private static final long SIZE_CACHE_TTL_MS = 
getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY,
+      DEFAULT_SIZE_CACHE_TTL_MS);
+
+  private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = 
"pinot.systemtable.tables.controllerTimeoutMs";
+  private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = 
Duration.ofSeconds(5).toMillis();
+  private static final long CONTROLLER_TIMEOUT_MS = 
getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY,
+      DEFAULT_CONTROLLER_TIMEOUT_MS);
+
+  private static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  private static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension("tableName", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("type", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("status", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("segments", FieldSpec.DataType.INT)
+      .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG)
+      .addMetric("reportedSize", FieldSpec.DataType.LONG)
+      .addMetric("estimatedSize", FieldSpec.DataType.LONG)
+      .addSingleValueDimension("storageTier", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING)
+      .addSingleValueDimension("replicas", FieldSpec.DataType.INT)
+      .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING)
+      .build();
+
+  private static final @Nullable Class<?> PINOT_ADMIN_CLIENT_CLASS = 
loadAdminClientClass();
+
+  private final TableCache _tableCache;
+  private final @Nullable HelixAdmin _helixAdmin;
+  private final @Nullable String _clusterName;
+  private final @Nullable Function<String, TableSize> 
_tableSizeFetcherOverride;
+  private final List<String> _staticControllerUrls;
+  private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>();
+  private final Map<String, AutoCloseable> _adminClientCache = new 
ConcurrentHashMap<>();
+  private final AtomicBoolean _loggedSizeFetchFailure = new AtomicBoolean();
+
+  public TablesSystemTableProvider() {
+    this(null, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache) {
+    this(tableCache, null, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin) {
+    this(tableCache, helixAdmin, null, null, null);
+  }
+
+  public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin,
+      @Nullable String clusterName) {
+    this(tableCache, helixAdmin, clusterName, null, null);
+  }
+
+  TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin 
helixAdmin, @Nullable String clusterName,
+      @Nullable Function<String, TableSize> tableSizeFetcherOverride, 
@Nullable List<String> controllerUrls) {
+    _tableCache = tableCache;
+    _helixAdmin = helixAdmin;
+    _clusterName = clusterName;
+    _tableSizeFetcherOverride = tableSizeFetcherOverride;
+    _staticControllerUrls = controllerUrls != null ? new 
ArrayList<>(controllerUrls) : List.of();
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return SCHEMA;
+  }
+
+  @Override
+  public TableConfig getTableConfig() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    for (Map.Entry<String, AutoCloseable> entry : 
_adminClientCache.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (Exception e) {
+        LOGGER.debug("Failed to close admin client for {}: {}", 
entry.getKey(), e.toString());
+      }
+    }
+    _adminClientCache.clear();
+  }
+
+  @Override
+  public BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery) {
+    List<String> projectionColumns = getProjectionColumns(pinotQuery);
+    if (_tableCache == null) {
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, List.of(), 0);
+    }
+    Set<String> tableNamesWithType = new LinkedHashSet<>();
+    for (String tableName : _tableCache.getTableNameMap().values()) {
+      if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) {
+        tableNamesWithType.add(tableName);
+      }
+    }
+    List<String> sortedTableNames = new ArrayList<>(tableNamesWithType);
+    sortedTableNames.sort(Comparator.naturalOrder());
+
+    int offset = Math.max(0, pinotQuery.getOffset());
+    int limit = pinotQuery.getLimit();
+    boolean hasLimit = limit > 0;
+    org.apache.pinot.common.request.Expression filterExpression = 
pinotQuery.getFilterExpression();
+    List<String> controllerBaseUrls = getControllerBaseUrls();
+    Function<String, TableSize> sizeFetcher = getSizeFetcher();
+
+    if (filterExpression == null) {
+      int totalRows = sortedTableNames.size();
+      int startIndex = Math.min(offset, totalRows);
+      int endIndex = totalRows;
+      if (limit == 0) {
+        endIndex = startIndex;
+      } else if (hasLimit) {
+        endIndex = Math.min(totalRows, startIndex + limit);
+      }
+      List<GenericRow> rows = new ArrayList<>(Math.max(0, endIndex - 
startIndex));
+      for (int i = startIndex; i < endIndex; i++) {
+        String tableNameWithType = sortedTableNames.get(i);
+        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+        if (tableType == null) {
+          continue;
+        }
+        String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+        TableStats stats = buildStats(tableNameWithType, tableType, 
sizeFetcher, controllerBaseUrls);
+        rows.add(buildRow(rawTableName, stats));
+      }
+      return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+    }
+
+    int totalRows = 0;
+    int initialCapacity = hasLimit ? Math.min(sortedTableNames.size(), limit) 
: sortedTableNames.size();
+    List<GenericRow> rows = new ArrayList<>(initialCapacity);
+    for (String tableNameWithType : sortedTableNames) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        continue;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      TableStats stats = buildStats(tableNameWithType, tableType, sizeFetcher, 
controllerBaseUrls);
+      if (!matchesFilter(filterExpression, stats, rawTableName)) {
+        continue;
+      }
+      totalRows++;
+      if (totalRows <= offset) {
+        continue;
+      }
+      if (limit == 0) {
+        continue;
+      }
+      if (hasLimit && rows.size() >= limit) {
+        continue;
+      }
+      rows.add(buildRow(rawTableName, stats));
+    }
+    return SystemTableResponseUtils.buildBrokerResponse(TABLE_NAME, SCHEMA, 
projectionColumns, rows, totalRows);
+  }
+
+  private GenericRow buildRow(String rawTableName, TableStats stats) {
+    GenericRow row = new GenericRow();
+    row.putValue("tableName", rawTableName);
+    row.putValue("type", stats._type);
+    row.putValue("status", stats._status);
+    row.putValue("segments", stats._segments);
+    row.putValue("totalDocs", stats._totalDocs);
+    row.putValue("reportedSize", stats._reportedSizeInBytes);
+    row.putValue("estimatedSize", stats._estimatedSizeInBytes);
+    row.putValue("storageTier", stats._storageTier);
+    row.putValue("brokerTenant", stats._brokerTenant);
+    row.putValue("serverTenant", stats._serverTenant);
+    row.putValue("replicas", stats._replicas);
+    row.putValue("tableConfig", stats._tableConfig);
+    return row;
+  }
+
+  private TableStats buildStats(String tableNameWithType, TableType tableType,
+      @Nullable Function<String, TableSize> tableSizeFetcher, List<String> 
controllerBaseUrls) {
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    int segments = 0;
+    long totalDocs = 0;
+    long reportedSize = 0;
+    long estimatedSize = 0;
+    String tierValue = "";
+    String brokerTenant = "";
+    String serverTenant = "";
+    int replicas = 0;
+    if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+      brokerTenant = tableConfig.getTenantConfig().getBroker();
+      serverTenant = tableConfig.getTenantConfig().getServer();
+    }
+    if (tableConfig != null && tableConfig.getValidationConfig() != null) {
+      Integer repl = tableConfig.getValidationConfig().getReplicationNumber();
+      replicas = repl != null ? repl : replicas;
+    }
+    // Use controller API only
+    TableSize sizeFromController = fetchTableSize(tableNameWithType, 
tableSizeFetcher, controllerBaseUrls);
+    if (sizeFromController != null) {
+      if (sizeFromController._reportedSizeInBytes >= 0) {
+        reportedSize = sizeFromController._reportedSizeInBytes;
+      }
+      if (sizeFromController._estimatedSizeInBytes >= 0) {
+        estimatedSize = sizeFromController._estimatedSizeInBytes;
+      }
+      segments = getSegmentCount(sizeFromController, tableType);
+      totalDocs = getTotalDocs(sizeFromController, tableType);
+    }
+    String status = (tableConfig != null || segments > 0) ? "ONLINE" : 
"UNKNOWN";
+    String tableConfigJson = "";
+    if (tableConfig != null) {
+      try {
+        tableConfigJson = JsonUtils.objectToString(tableConfig);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to serialize table config for {}: {}", 
tableNameWithType, e.toString());
+        tableConfigJson = tableConfig.toString();
+      }
+    }
+    return new TableStats(tableType.name(), status, segments, totalDocs, 
reportedSize, estimatedSize, tierValue,
+        tableConfigJson, brokerTenant, serverTenant, replicas);
+  }
+
+  private boolean matchesFilter(@Nullable 
org.apache.pinot.common.request.Expression filterExpression, TableStats stats,
+      String rawTableName) {
+    if (filterExpression == null) {
+      return true;
+    }
+    org.apache.pinot.common.request.Function function = 
filterExpression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    FilterKind filterKind = toFilterKind(function.getOperator());
+    if (filterKind == null) {
+      return true;
+    }
+    switch (filterKind) {
+      case AND:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (!matchesFilter(child, stats, rawTableName)) {
+            return false;
+          }
+        }
+        return true;
+      case OR:
+        for (org.apache.pinot.common.request.Expression child : 
function.getOperands()) {
+          if (matchesFilter(child, stats, rawTableName)) {
+            return true;
+          }
+        }
+        return false;
+      case NOT:
+        if (function.getOperandsSize() == 0) {
+          return true;
+        }
+        return !matchesFilter(function.getOperands().get(0), stats, 
rawTableName);
+      default:
+        return matchesLeafFilter(filterKind, function.getOperands(), stats, 
rawTableName);
+    }
+  }
+
+  private boolean matchesLeafFilter(FilterKind filterKind,
+      List<org.apache.pinot.common.request.Expression> operands, TableStats 
stats, String rawTableName) {
+    String column = extractIdentifier(operands);
+    if (column == null) {
+      return true;
+    }
+    List<String> values = extractLiteralValues(operands);
+    if (values.isEmpty()) {
+      return true;
+    }
+    switch (column.toLowerCase(Locale.ROOT)) {
+      case "tablename":
+        return matchesString(values, rawTableName, filterKind);
+      case "type":
+        return matchesString(values, stats._type, filterKind);
+      case "status":
+        return matchesString(values, stats._status, filterKind);
+      case "segments":
+        return matchesNumber(values, stats._segments, filterKind);
+      case "reportedsize":
+        return matchesNumber(values, stats._reportedSizeInBytes, filterKind);
+      case "estimatedsize":
+        return matchesNumber(values, stats._estimatedSizeInBytes, filterKind);
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesString(List<String> candidates, String actual, 
FilterKind filterKind) {
+    switch (filterKind) {
+      case EQUALS:
+      case IN:
+        return candidates.stream().anyMatch(v -> v.equalsIgnoreCase(actual));
+      case NOT_EQUALS:
+        return candidates.stream().noneMatch(v -> v.equalsIgnoreCase(actual));
+      default:
+        return true;
+    }
+  }
+
+  private boolean matchesNumber(List<String> candidates, long actual, 
FilterKind filterKind) {
+    try {
+      switch (filterKind) {
+        case EQUALS:
+          return candidates.stream().anyMatch(v -> Long.parseLong(v) == 
actual);
+        case NOT_EQUALS:
+          return candidates.stream().noneMatch(v -> Long.parseLong(v) == 
actual);
+        case GREATER_THAN:
+          return candidates.stream().anyMatch(v -> actual > Long.parseLong(v));
+        case GREATER_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual >= 
Long.parseLong(v));
+        case LESS_THAN:
+          return candidates.stream().anyMatch(v -> actual < Long.parseLong(v));
+        case LESS_THAN_OR_EQUAL:
+          return candidates.stream().anyMatch(v -> actual <= 
Long.parseLong(v));
+        case IN:
+          for (String candidate : candidates) {
+            if (actual == Long.parseLong(candidate)) {
+              return true;
+            }
+          }
+          return false;
+        case RANGE:
+        case BETWEEN:
+          if (candidates.size() >= 2) {
+            long lower = Long.parseLong(candidates.get(0));
+            long upper = Long.parseLong(candidates.get(1));
+            return actual >= lower && actual <= upper;
+          }
+          return true;
+        default:
+          return true;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.debug("Failed to parse numeric filter value {}: {}", candidates, 
e.toString());
+      return true;
+    }
+  }
+
+  private @Nullable String 
extractIdentifier(List<org.apache.pinot.common.request.Expression> operands) {
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Identifier identifier = 
operand.getIdentifier();
+      if (identifier != null) {
+        return identifier.getName();
+      }
+    }
+    return null;
+  }
+
+  private List<String> 
extractLiteralValues(List<org.apache.pinot.common.request.Expression> operands) 
{
+    List<String> values = new ArrayList<>();
+    for (org.apache.pinot.common.request.Expression operand : operands) {
+      org.apache.pinot.common.request.Literal literal = operand.getLiteral();
+      if (literal != null) {
+        if (literal.getSetField() == 
org.apache.pinot.common.request.Literal._Fields.NULL_VALUE) {
+          values.add("null");
+          continue;
+        }
+        try {
+          values.add(RequestUtils.getLiteralString(literal));
+        } catch (Exception e) {
+          values.add(String.valueOf(RequestUtils.getLiteralValue(literal)));
+        }
+      }
+    }
+    return values;
+  }
+
+  private @Nullable FilterKind toFilterKind(String operator) {
+    String normalized = operator.toUpperCase(Locale.ROOT);
+    switch (normalized) {
+      case "EQ":
+        return FilterKind.EQUALS;
+      case "NEQ":
+        return FilterKind.NOT_EQUALS;
+      case "GT":
+        return FilterKind.GREATER_THAN;
+      case "GTE":
+        return FilterKind.GREATER_THAN_OR_EQUAL;
+      case "LT":
+        return FilterKind.LESS_THAN;
+      case "LTE":
+        return FilterKind.LESS_THAN_OR_EQUAL;
+      default:
+        try {
+          return FilterKind.valueOf(normalized);
+        } catch (Exception e) {
+          return null;
+        }
+    }
+  }
+
+  private @Nullable TableSize fetchTableSize(String tableNameWithType,
+      @Nullable Function<String, TableSize> fetcher, List<String> 
controllerBaseUrls) {
+    boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0;
+    TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null;
+    if (cached != null) {
+      return cached;
+    }
+    if (fetcher != null) {
+      try {
+        TableSize fetched = fetcher.apply(tableNameWithType);
+        if (fetched != null) {
+          if (cacheEnabled) {
+            cacheSize(tableNameWithType, fetched);
+          }
+          return fetched;
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, 
e.toString());
+      }
+    }
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName);
+    if (size == null) {
+      size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType);
+      if (size == null) {
+        logSizeFetchFailure("{}: failed to fetch size for {} from controllers 
{} (tried '{}' and '{}')", TABLE_NAME,
+            tableNameWithType, controllerBaseUrls, rawTableName, 
tableNameWithType);
+      }
+    }
+    if (size != null && cacheEnabled) {
+      cacheSize(tableNameWithType, size);
+    }
+    return size;
+  }
+
+  private @Nullable TableSize fetchTableSizeForName(List<String> 
controllerBaseUrls, String tableName) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    for (String baseUrl : controllerBaseUrls) {
+      try {
+        AutoCloseable adminClient = getOrCreateAdminClient(baseUrl);
+        if (adminClient == null) {
+          continue;
+        }
+
+        Object sizeNode;
+        try {
+          sizeNode = clientClass.getMethod("getTableSize", String.class, 
boolean.class, boolean.class)
+              .invoke(adminClient, tableName, true, false);
+        } catch (NoSuchMethodException e) {
+          Object tableClient = 
clientClass.getMethod("getTableClient").invoke(adminClient);
+          sizeNode = tableClient.getClass().getMethod("getTableSize", 
String.class, boolean.class, boolean.class)
+              .invoke(tableClient, tableName, true, false);
+        }
+
+        if (sizeNode == null) {
+          continue;
+        }
+
+        TableSize parsed = JsonUtils.stringToObject(sizeNode.toString(), 
TableSize.class);
+        LOGGER.debug("{}: controller size response for {} via {} -> segments 
offline={}, realtime={}, "
+                + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, 
baseUrl,
+            parsed._offlineSegments != null && 
parsed._offlineSegments._segments != null
+                ? parsed._offlineSegments._segments.size() : 0,
+            parsed._realtimeSegments != null && 
parsed._realtimeSegments._segments != null
+                ? parsed._realtimeSegments._segments.size() : 0,
+            parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes);
+        return parsed;
+      } catch (Exception e) {
+        logSizeFetchFailure("{}: error fetching table size for {} via {} using 
admin client", TABLE_NAME, tableName,
+            baseUrl, e);
+      }
+    }
+    return null;
+  }
+
+  private List<String> getControllerBaseUrls() {
+    Set<String> urls = new LinkedHashSet<>();
+    if (_helixAdmin != null) {
+      for (String controller : discoverControllersFromHelix()) {
+        String normalized = normalizeControllerUrl(controller);
+        if (normalized != null) {
+          urls.add(normalized);
+        }
+      }
+    }
+    for (String url : _staticControllerUrls) {
+      String normalized = normalizeControllerUrl(url);
+      if (normalized != null) {
+        urls.add(normalized);
+      }
+    }
+    return new ArrayList<>(urls);
+  }
+
+  private int getSegmentCount(TableSize sizeFromController, TableType 
tableType) {
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.size();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.size();
+    }
+    return 0;
+  }
+
+  private long getTotalDocs(TableSize sizeFromController, TableType tableType) 
{
+    if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments 
!= null
+        && sizeFromController._offlineSegments._segments != null) {
+      return sizeFromController._offlineSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    if (tableType == TableType.REALTIME && 
sizeFromController._realtimeSegments != null
+        && sizeFromController._realtimeSegments._segments != null) {
+      return sizeFromController._realtimeSegments._segments.values().stream()
+          .mapToLong(segmentSize -> segmentSize._totalDocs).sum();
+    }
+    return 0;
+  }
+
+  private @Nullable Function<String, TableSize> getSizeFetcher() {
+    if (_tableSizeFetcherOverride != null) {
+      return _tableSizeFetcherOverride;
+    }
+    return null;
+  }
+
+  private List<String> discoverControllersFromHelix() {
+    List<String> urls = new ArrayList<>();
+    try {
+      if (_clusterName == null) {
+        LOGGER.warn("Cannot discover controllers without cluster name");
+        return List.of();
+      }
+      for (String controllerId : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+        if (!InstanceTypeUtils.isController(controllerId)) {
+          continue;
+        }
+        int firstUnderscore = controllerId.indexOf('_');
+        int lastUnderscore = controllerId.lastIndexOf('_');
+        if (firstUnderscore > 0 && lastUnderscore > firstUnderscore && 
lastUnderscore < controllerId.length() - 1) {
+          String host = controllerId.substring(firstUnderscore + 1, 
lastUnderscore);
+          String port = controllerId.substring(lastUnderscore + 1);
+          if (!host.isEmpty() && !port.isEmpty()) {
+            urls.add(host + ":" + port);
+          } else {
+            LOGGER.warn("Unable to parse controller address from instance id 
(empty host or port): {}", controllerId);
+          }
+        } else {
+          LOGGER.warn("Unable to parse controller address from instance id 
'{}' (expected 'Controller_<host>_<port>')",
+              controllerId);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Failed to discover controllers from Helix", e);
+    }
+    return urls;
+  }
+
+  private @Nullable AutoCloseable getOrCreateAdminClient(String 
controllerBaseUrl) {
+    Class<?> clientClass = PINOT_ADMIN_CLIENT_CLASS;
+    if (clientClass == null) {
+      return null;
+    }
+    String normalized = normalizeControllerUrl(controllerBaseUrl);
+    if (normalized == null) {
+      return null;
+    }
+    return _adminClientCache.computeIfAbsent(normalized, controller -> {
+      try {
+        String controllerAddress = stripScheme(controller);
+        Properties properties = new Properties();
+        properties.setProperty(ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS, 
String.valueOf(CONTROLLER_TIMEOUT_MS));
+        properties.setProperty(ADMIN_TRANSPORT_SCHEME, 
controller.startsWith("https://";) ? "https" : "http");

Review Comment:
   The constant names `ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS` and 
`ADMIN_TRANSPORT_SCHEME` are defined without a clear prefix indicating they are 
property keys for the PinotAdminClient. Consider renaming them to something 
like `PINOT_ADMIN_REQUEST_TIMEOUT_MS_PROPERTY_KEY` and 
`PINOT_ADMIN_SCHEME_PROPERTY_KEY` for clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to