Copilot commented on code in PR #17842:
URL: https://github.com/apache/pinot/pull/17842#discussion_r2906718144


##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java:
##########
@@ -54,15 +69,26 @@ protected Context preprocess(Properties 
periodicTaskProperties) {
 
   @Override
   protected void processTable(String tableNameWithType, Context context) {
-    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-    if (tableConfig == null) {
-      LOGGER.warn("Failed to find table config for table: {}, skipping broker 
resource validation", tableNameWithType);
-      return;
+    Set<String> brokerInstances;
+    if (TableNameBuilder.isTableResource(tableNameWithType)) {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping 
broker resource validation",
+            tableNameWithType);
+        return;
+      }
+      brokerInstances = _pinotHelixResourceManager
+          .getAllInstancesForBrokerTenant(context._instanceConfigs, 
tableConfig.getTenantConfig().getBroker());
+    } else {
+      LogicalTableConfig logicalTableConfig = 
_pinotHelixResourceManager.getLogicalTableConfig(tableNameWithType);
+      if (logicalTableConfig == null) {
+        LOGGER.warn("Failed to find logical table config for: {}, skipping 
broker resource validation",
+            tableNameWithType);
+        return;
+      }
+      brokerInstances = _pinotHelixResourceManager
+          .getAllInstancesForBrokerTenant(context._instanceConfigs, 
logicalTableConfig.getBrokerTenant());

Review Comment:
   Using `TableNameBuilder.isTableResource(...)` to decide physical vs logical 
can misclassify a logical table whose name ends with `_OFFLINE`/`_REALTIME`, 
causing it to be treated as a physical table and skipped (no table config 
found). Prefer determining the type by config presence (e.g., attempt table 
config lookup first; if absent, fall back to logical table config) rather than 
relying on name patterns.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -748,6 +748,25 @@ public List<String> getAllTables() {
     return 
getAllResources().stream().filter(TableNameBuilder::isTableResource).collect(Collectors.toList());
   }
 
+  /**
+   * Returns partition names in broker resource ideal state that are logical 
tables (not physical _OFFLINE/_REALTIME)
+   * and have a logical table config. Used by broker resource validation to 
repair logical table broker assignments.
+   */
+  public List<String> getBrokerResourceLogicalTablePartitions() {
+    IdealState brokerIdealState = 
HelixHelper.getBrokerIdealStates(_helixAdmin, _helixClusterName);
+    if (brokerIdealState == null) {
+      return Collections.emptyList();
+    }
+    List<String> logicalPartitions = new ArrayList<>();
+    for (String partition : brokerIdealState.getPartitionSet()) {
+      if (!TableNameBuilder.isTableResource(partition)
+          && ZKMetadataProvider.getLogicalTableConfig(_propertyStore, 
partition) != null) {
+        logicalPartitions.add(partition);
+      }
+    }

Review Comment:
   `getBrokerResourceLogicalTablePartitions()` currently excludes partitions 
that *look* like physical tables by name (`isTableResource`). This can drop 
valid logical tables whose names match the physical naming convention (e.g., 
end with `_OFFLINE`). Consider identifying logical partitions by checking 
configs instead (e.g., if `getTableConfig(...)` is null and 
`getLogicalTableConfig(...)` is non-null, treat as logical), rather than 
relying on name heuristics.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java:
##########
@@ -45,6 +49,17 @@ public BrokerResourceValidationManager(ControllerConf 
config, PinotHelixResource
         controllerMetrics);
   }
 
+  @Override
+  protected List<String> getTablesToProcess(Properties periodicTaskProperties) 
{
+    List<String> tables = super.getTablesToProcess(periodicTaskProperties);
+    if (periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME) != 
null) {
+      return tables;
+    }
+    List<String> combined = new ArrayList<>(tables);
+    
combined.addAll(_pinotHelixResourceManager.getBrokerResourceLogicalTablePartitions());
+    return combined;
+  }
+
   @Override

Review Comment:
   This adds logical-table partitions to the work list, but 
`ControllerPeriodicTask` will still filter via `shouldProcessTable()` (default: 
`_leadControllerManager.isLeaderForTable(tableNameWithType)`). If table 
leadership is only tracked for physical tables, logical partitions may be 
filtered out and never processed. Consider overriding `shouldProcessTable` in 
`BrokerResourceValidationManager` to ensure logical partitions are processed 
(e.g., return `true` for logical partitions, while retaining the leader check 
for physical tables).
   ```suggestion
     @Override
     protected boolean shouldProcessTable(String tableNameWithType) {
       // Always process logical-table partitions; retain leader-based 
filtering for physical tables.
       if (!TableNameBuilder.isTableResource(tableNameWithType)) {
         return true;
       }
       return super.shouldProcessTable(tableNameWithType);
     }
   
     @Override
   ```



##########
pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link BrokerResourceValidationManager}, including that 
getTablesToProcess
+ * returns both physical table names and logical table partition names (Issue 
#15751).
+ */
+public class BrokerResourceValidationManagerTest {
+
+  private static final String PHYSICAL_TABLE = "myTable_OFFLINE";
+  private static final String LOGICAL_TABLE_PARTITION = "my_logical_table";
+
+  private PinotHelixResourceManager _resourceManager;
+  private BrokerResourceValidationManager _validationManager;
+
+  @BeforeMethod
+  public void setUp() {
+    _resourceManager = mock(PinotHelixResourceManager.class);
+    when(_resourceManager.getAllTables()).thenReturn(List.of(PHYSICAL_TABLE));
+    
when(_resourceManager.getBrokerResourceLogicalTablePartitions()).thenReturn(List.of(LOGICAL_TABLE_PARTITION));
+
+    ControllerConf config = new ControllerConf();
+    LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
+    ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    _validationManager = new BrokerResourceValidationManager(config, 
_resourceManager, leadControllerManager,
+        controllerMetrics);
+  }
+
+  /**
+   * Verifies that getTablesToProcess returns both physical tables (from 
getAllTables) and
+   * logical table partitions (from getBrokerResourceLogicalTablePartitions) 
so that the
+   * periodic task validates and repairs broker resource for logical tables 
too.
+   */
+  @Test
+  public void testGetTablesToProcessIncludesLogicalTablePartitions()
+      throws Exception {
+    Method getTablesToProcess =
+        
BrokerResourceValidationManager.class.getDeclaredMethod("getTablesToProcess", 
Properties.class);
+    getTablesToProcess.setAccessible(true);
+
+    @SuppressWarnings("unchecked")
+    List<String> tables = (List<String>) 
getTablesToProcess.invoke(_validationManager, new Properties());

Review Comment:
   The test uses reflection to invoke `getTablesToProcess`, but the method is 
`protected` and the test is in the same package 
(`org.apache.pinot.controller.validation`), so it can be called directly 
without reflection. Calling it directly will make the test less brittle (e.g., 
no dependence on method name strings or reflective access).



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1150,16 +1189,42 @@ public PinotResourceManagerResponse 
rebuildBrokerResource(String tableNameWithTy
   }
 
   private void addInstanceToBrokerIdealState(String brokerTenantTag, String 
instanceName) {
-    IdealState tableIdealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, 
Helix.BROKER_RESOURCE_INSTANCE);
-    for (String tableNameWithType : tableIdealState.getPartitionSet()) {
-      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-      Preconditions.checkNotNull(tableConfig);
-      String brokerTag = 
TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
-      if (brokerTag.equals(brokerTenantTag)) {
-        tableIdealState.setPartitionState(tableNameWithType, instanceName, 
BrokerResourceStateModel.ONLINE);
+    // Use atomic read-modify-write so updates (including for logical tables) 
are persisted and not lost to races.
+    HelixHelper.updateIdealState(getHelixZkManager(), 
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
+      assert idealState != null;
+      for (String partitionName : idealState.getPartitionSet()) {
+        String brokerTag = 
resolveBrokerTagForBrokerResourcePartition(partitionName);
+        if (brokerTag == null) {
+          continue;
+        }
+        if (brokerTag.equals(brokerTenantTag)) {
+          idealState.setPartitionState(partitionName, instanceName, 
BrokerResourceStateModel.ONLINE);
+        }
       }
+      return idealState;
+    }, DEFAULT_RETRY_POLICY);
+  }
+
+  /**
+   * Resolves the broker tag for a partition in the broker resource. Tries 
physical table config first,
+   * then logical table config, so both are handled regardless of partition 
naming (e.g. a logical table
+   * name ending with _OFFLINE would otherwise be misclassified as physical 
and skipped).
+   *
+   * @param partitionName partition name in broker ideal state (physical table 
name with type or logical table name)
+   * @return broker tag for the partition, or null if the partition cannot be 
resolved (unknown or missing config)
+   */
+  private String resolveBrokerTagForBrokerResourcePartition(String 
partitionName) {
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, partitionName);
+    if (tableConfig != null) {
+      return TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
+    }
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, partitionName);
+    if (logicalTableConfig != null) {
+      return 
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant());
     }
-    _helixAdmin.setResourceIdealState(_helixClusterName, 
Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+    LOGGER.warn("Skipping partition {} in broker resource: no table config or 
logical table config found",

Review Comment:
   Logging a WARN for every unresolved broker-resource partition during broker 
add can create noisy logs (especially if partitions transiently exist without 
configs during table drops/updates). Consider lowering this to DEBUG, or 
rate-limiting/aggregating (e.g., log once per call with a count), to avoid 
spamming controller logs.
   ```suggestion
       LOGGER.debug("Skipping partition {} in broker resource: no table config 
or logical table config found",
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1102,21 +1121,41 @@ private PinotResourceManagerResponse 
scaleUpBroker(Tenant tenant, String brokerT
 
   public PinotResourceManagerResponse 
rebuildBrokerResourceFromHelixTags(String tableNameWithType)
       throws Exception {
-    TableConfig tableConfig;
-    try {
-      tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, 
tableNameWithType);
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while getting table config for table {}", 
tableNameWithType, e);
-      throw new InvalidTableConfigException(
-          "Failed to fetch broker tag for table " + tableNameWithType + " due 
to exception: " + e.getMessage());
-    }
-    if (tableConfig == null) {
-      LOGGER.warn("Table {} does not exist", tableNameWithType);
-      throw new InvalidConfigException(
-          "Invalid table configuration for table " + tableNameWithType + ". 
Table does not exist");
+    Set<String> brokerInstances;
+    if (TableNameBuilder.isTableResource(tableNameWithType)) {
+      TableConfig tableConfig;
+      try {
+        tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, 
tableNameWithType);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while getting table config for table 
{}", tableNameWithType, e);
+        throw new InvalidTableConfigException(
+            "Failed to fetch broker tag for table " + tableNameWithType + " 
due to exception: " + e.getMessage());
+      }
+      if (tableConfig == null) {
+        LOGGER.warn("Table {} does not exist", tableNameWithType);
+        throw new InvalidConfigException(
+            "Invalid table configuration for table " + tableNameWithType + ". 
Table does not exist");
+      }
+      brokerInstances = 
getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker());
+    } else {
+      // Logical table
+      LogicalTableConfig logicalTableConfig;

Review Comment:
   `rebuildBrokerResourceFromHelixTags` uses 
`isTableResource(tableNameWithType)` to branch. This has the same 
misclassification problem as above: a logical table named with a physical 
suffix will go down the physical path and throw `InvalidConfigException` even 
though a logical table config exists. Consider branching by config existence 
(try physical `getTableConfig`; if null then try `getLogicalTableConfig`) 
rather than by name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to