This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a52aa07480 Enhance BrokerResourceValidationManager to support logical 
table partitions in resource validation (#17842)
6a52aa07480 is described below

commit 6a52aa07480e5c9e6019977263ce76f57e676599
Author: Abhishek Bafna <[email protected]>
AuthorDate: Tue Mar 17 19:12:23 2026 +0530

    Enhance BrokerResourceValidationManager to support logical table partitions 
in resource validation (#17842)
---
 .../pinot/common/utils/helix/HelixHelper.java      | 22 +++--
 .../helix/core/PinotHelixResourceManager.java      | 77 ++++++++++++-----
 .../core/periodictask/ControllerPeriodicTask.java  | 26 ++++--
 .../BrokerResourceValidationManager.java           | 34 ++++++--
 .../PinotHelixResourceManagerStatelessTest.java    | 69 +++++++++++++++
 .../BrokerResourceValidationManagerTest.java       | 98 ++++++++++++++++++++++
 .../validation/ValidationManagerStatelessTest.java | 60 +++++++++++++
 .../ControllerPeriodicTasksIntegrationTest.java    | 50 +++++++++++
 8 files changed, 392 insertions(+), 44 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index a052e6bee75..03955696130 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -40,6 +40,7 @@ import 
org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.pinot.common.helix.ExtraInstanceConfig;
@@ -48,6 +49,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.version.PinotVersion;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
@@ -534,24 +536,26 @@ public class HelixHelper {
   }
 
   public static Set<String> getTablesForBrokerTag(HelixManager helixManager, 
String brokerTag) {
-    Set<String> tablesForBrokerTag = new HashSet<>();
-    List<TableConfig> tableConfigs = 
ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore());
-    for (TableConfig tableConfig : tableConfigs) {
-      if 
(TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()).equals(brokerTag))
 {
-        tablesForBrokerTag.add(tableConfig.getTableName());
-      }
-    }
-    return tablesForBrokerTag;
+    return getTablesForBrokerTags(helixManager, List.of(brokerTag));
   }
 
   public static Set<String> getTablesForBrokerTags(HelixManager helixManager, 
List<String> brokerTags) {
     Set<String> tablesForBrokerTags = new HashSet<>();
-    List<TableConfig> tableConfigs = 
ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore());
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
helixManager.getHelixPropertyStore();
+    List<TableConfig> tableConfigs = 
ZKMetadataProvider.getAllTableConfigs(propertyStore);
     for (TableConfig tableConfig : tableConfigs) {
       if 
(brokerTags.contains(TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker())))
 {
         tablesForBrokerTags.add(tableConfig.getTableName());
       }
     }
+    // Include logical tables that use any of these broker tenants
+    for (LogicalTableConfig logicalTableConfig : 
ZKMetadataProvider.getAllLogicalTableConfigs(propertyStore)) {
+      String logicalBrokerTag =
+          
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant());
+      if (brokerTags.contains(logicalBrokerTag)) {
+        tablesForBrokerTags.add(logicalTableConfig.getTableName());
+      }
+    }
     return tablesForBrokerTags;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index bca909d70dd..85cb08c1cdf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -748,6 +748,14 @@ public class PinotHelixResourceManager {
     return 
getAllResources().stream().filter(TableNameBuilder::isTableResource).collect(Collectors.toList());
   }
 
+  /**
+   * Returns all logical table names in the cluster. Used by broker resource 
validation to repair logical table
+   * broker assignments (and to add missing logical tables to the broker 
resource when needed).
+   */
+  public List<String> getBrokerResourceLogicalTables() {
+    return getAllLogicalTableNames();
+  }
+
   /**
    * Get all table names (with type suffix) from provided database.
    *
@@ -1101,22 +1109,25 @@ public class PinotHelixResourceManager {
   }
 
   public PinotResourceManagerResponse 
rebuildBrokerResourceFromHelixTags(String tableNameWithType)
-      throws Exception {
-    TableConfig tableConfig;
+  throws Exception {
+    Set<String> brokerInstances;
     try {
-      tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, 
tableNameWithType);
+      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+      if (tableConfig != null) {
+        brokerInstances = 
getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker());
+      } else {
+        LogicalTableConfig logicalTableConfig =
+            ZKMetadataProvider.getLogicalTableConfig(_propertyStore, 
tableNameWithType);
+        Preconditions.checkNotNull(logicalTableConfig, "No table config or 
logical table config found for %s",
+            tableNameWithType);
+        brokerInstances = 
getAllInstancesForBrokerTenant(logicalTableConfig.getBrokerTenant());
+      }
     } catch (Exception e) {
-      LOGGER.warn("Caught exception while getting table config for table {}", 
tableNameWithType, e);
-      throw new InvalidTableConfigException(
-          "Failed to fetch broker tag for table " + tableNameWithType + " due 
to exception: " + e.getMessage());
-    }
-    if (tableConfig == null) {
-      LOGGER.warn("Table {} does not exist", tableNameWithType);
+      LOGGER.warn("Caught exception while getting config for table {}", 
tableNameWithType, e);
       throw new InvalidConfigException(
-          "Invalid table configuration for table " + tableNameWithType + ". 
Table does not exist");
+          "Failed to fetch broker config for table " + tableNameWithType + " 
due to exception: " + e.getMessage());
     }
-    return rebuildBrokerResource(tableNameWithType,
-        
getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker()));
+    return rebuildBrokerResource(tableNameWithType, brokerInstances);
   }
 
   public PinotResourceManagerResponse rebuildBrokerResource(String 
tableNameWithType, Set<String> brokerInstances) {
@@ -1150,16 +1161,37 @@ public class PinotHelixResourceManager {
   }
 
   private void addInstanceToBrokerIdealState(String brokerTenantTag, String 
instanceName) {
-    IdealState tableIdealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, 
Helix.BROKER_RESOURCE_INSTANCE);
-    for (String tableNameWithType : tableIdealState.getPartitionSet()) {
-      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-      Preconditions.checkNotNull(tableConfig);
-      String brokerTag = 
TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
-      if (brokerTag.equals(brokerTenantTag)) {
-        tableIdealState.setPartitionState(tableNameWithType, instanceName, 
BrokerResourceStateModel.ONLINE);
+    // Use atomic read-modify-write so updates (including for logical tables) 
are persisted and not lost to races.
+    HelixHelper.updateIdealState(getHelixZkManager(), 
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
+      Preconditions.checkNotNull(idealState, "Broker ideal state must not be 
null");
+      for (String partitionName : idealState.getPartitionSet()) {
+        String brokerTag = resolveBrokerTagForTable(partitionName);
+        if (brokerTag.equals(brokerTenantTag)) {
+          idealState.setPartitionState(partitionName, instanceName, 
BrokerResourceStateModel.ONLINE);
+        }
       }
+      return idealState;
+    }, DEFAULT_RETRY_POLICY);
+  }
+
+  /**
+   * Resolves the broker tag for a table in the broker resource. Tries 
physical table config first,
+   * then logical table config.
+   *
+   * @param tableName table name in broker ideal state (physical table name 
with type or logical table name)
+   * @return broker tag for the table, or throw exception if the table name 
cannot be resolved
+   */
+  private String resolveBrokerTagForTable(String tableName) {
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableName);
+    if (tableConfig != null) {
+      return TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
     }
-    _helixAdmin.setResourceIdealState(_helixClusterName, 
Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+    if (logicalTableConfig != null) {
+      return 
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant());
+    }
+    throw new InvalidTableConfigException("Failed to resolve broker tag for 
table " + tableName
+        + " because no table config or logical table config found");
   }
 
   private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String 
brokerTenantTag,
@@ -1886,7 +1918,7 @@ public class PinotHelixResourceManager {
         // Add ideal state with consuming segments from designated stream 
metadata
         _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState, 
streamMetadataList);
         LOGGER.info("Adding table {}: Added consuming segments ideal state 
given the designated stream metadata",
-                tableNameWithType);
+            tableNameWithType);
       }
     } catch (Exception e) {
       LOGGER.error("Caught exception while setting up table: {}, cleaning it 
up", tableNameWithType, e);
@@ -4996,7 +5028,8 @@ public class PinotHelixResourceManager {
    * @throws TableNotFoundException if the specified real-time table does not 
exist.
    * @throws IllegalStateException if the IdealState for the table is not 
found.
    */
-  public WatermarkInductionResult getConsumerWatermarks(String tableName) 
throws TableNotFoundException {
+  public WatermarkInductionResult getConsumerWatermarks(String tableName)
+      throws TableNotFoundException {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     if (!hasRealtimeTable(tableName)) {
       throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index e49266f429b..7d2accc9934 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -64,18 +64,32 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask impleme
     _controllerMetrics = controllerMetrics;
   }
 
+  /**
+   * Returns the list of table names (with type) to consider for this task. 
Subclasses may override to add
+   * more names (e.g. logical table partitions). Default: single table from 
property or all physical tables.
+   */
+  protected List<String> getTablesToProcess(Properties periodicTaskProperties) 
{
+    String propTableNameWithType = (String) 
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
+    return propTableNameWithType != null ? List.of(propTableNameWithType)
+        : _pinotHelixResourceManager.getAllTables();
+  }
+
+  /**
+   * Returns whether the given table should be processed by this controller 
run. Default: only if this
+   * controller is leader for the table. Subclasses may override (e.g. to 
always include logical tables).
+   */
+  protected boolean shouldProcessTable(String tableNameWithType) {
+    return _leadControllerManager.isLeaderForTable(tableNameWithType);
+  }
+
   @Override
   protected final void runTask(Properties periodicTaskProperties) {
     _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
     try {
-      // Check if we have a specific table against which this task needs to be 
run.
-      String propTableNameWithType = (String) 
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
-      // Process the tables that are managed by this controller
-      List<String> allTables =
-          propTableNameWithType != null ? List.of(propTableNameWithType) : 
_pinotHelixResourceManager.getAllTables();
+      List<String> allTables = getTablesToProcess(periodicTaskProperties);
 
       Set<String> currentLeaderOfTables = allTables.stream()
-          .filter(_leadControllerManager::isLeaderForTable)
+          .filter(this::shouldProcessTable)
           .collect(Collectors.toSet());
 
       if (!currentLeaderOfTables.isEmpty()) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 858b3cae46a..7f0d1dc1564 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.validation;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
@@ -27,7 +28,9 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +48,17 @@ public class BrokerResourceValidationManager extends 
ControllerPeriodicTask<Brok
         controllerMetrics);
   }
 
+  @Override
+  protected List<String> getTablesToProcess(Properties periodicTaskProperties) 
{
+    List<String> tables = super.getTablesToProcess(periodicTaskProperties);
+    if (periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME) != 
null) {
+      return tables;
+    }
+    List<String> combined = new ArrayList<>(tables);
+    
combined.addAll(_pinotHelixResourceManager.getBrokerResourceLogicalTables());
+    return combined;
+  }
+
   @Override
   protected Context preprocess(Properties periodicTaskProperties) {
     Context context = new Context();
@@ -55,15 +69,21 @@ public class BrokerResourceValidationManager extends 
ControllerPeriodicTask<Brok
   @Override
   protected void processTable(String tableNameWithType, Context context) {
     TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-    if (tableConfig == null) {
-      LOGGER.warn("Failed to find table config for table: {}, skipping broker 
resource validation", tableNameWithType);
+    if (tableConfig != null) {
+      Set<String> brokerInstances = _pinotHelixResourceManager
+          .getAllInstancesForBrokerTenant(context._instanceConfigs, 
tableConfig.getTenantConfig().getBroker());
+      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, 
brokerInstances);
       return;
     }
-
-    // Rebuild broker resource
-    Set<String> brokerInstances = _pinotHelixResourceManager
-        .getAllInstancesForBrokerTenant(context._instanceConfigs, 
tableConfig.getTenantConfig().getBroker());
-    _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, 
brokerInstances);
+    LogicalTableConfig logicalTableConfig = 
_pinotHelixResourceManager.getLogicalTableConfig(tableNameWithType);
+    if (logicalTableConfig != null) {
+      Set<String> brokerInstances = _pinotHelixResourceManager
+          .getAllInstancesForBrokerTenant(context._instanceConfigs, 
logicalTableConfig.getBrokerTenant());
+      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, 
brokerInstances);
+      return;
+    }
+    LOGGER.warn("No table config or logical table config found for: {}, 
skipping broker resource validation",
+        tableNameWithType);
   }
 
   public static final class Context {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index e6d673b4273..bc643b8c92e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -78,6 +78,7 @@ import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -326,6 +327,65 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     resetBrokerTags();
   }
 
+  /**
+   * Verifies that when a new broker is added with the same tenant as a 
logical table,
+   * the broker resource ideal state is updated for the logical table 
partition (Issue #15751).
+   */
+  @Test
+  public void testUpdateBrokerResourceWithLogicalTable()
+      throws Exception {
+    untagBrokers();
+    Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 2, 
0, 0);
+    _helixResourceManager.createBrokerTenant(brokerTenant);
+
+    String brokerTag = TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME);
+    List<InstanceConfig> instanceConfigs = 
HelixHelper.getInstanceConfigs(_helixManager);
+    List<String> taggedBrokers = 
HelixHelper.getInstancesWithTag(instanceConfigs, brokerTag);
+    assertEquals(taggedBrokers.size(), 2);
+
+    // Add physical tables (offline + realtime) via manager with test tenants, 
then logical table
+    addDummySchema(RAW_TABLE_NAME);
+    TableConfig offlineTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
+            .setServerTenant(SERVER_TENANT_NAME).build();
+    waitForEVToDisappear(offlineTableConfig.getTableName());
+    _helixResourceManager.addTable(offlineTableConfig);
+    waitForEVToDisappear(REALTIME_TABLE_NAME);
+    TableConfig realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
+            .setServerTenant(SERVER_TENANT_NAME)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+    _helixResourceManager.addTable(realtimeTableConfig);
+    List<String> physicalTableNamesWithType = List.of(OFFLINE_TABLE_NAME, 
REALTIME_TABLE_NAME);
+    String logicalTableName = "test_logical_table";
+    addDummySchema(logicalTableName);
+    LogicalTableConfig logicalTableConfig =
+        ControllerTest.getDummyLogicalTableConfig(logicalTableName, 
physicalTableNamesWithType, BROKER_TENANT_NAME);
+    _helixResourceManager.addLogicalTableConfig(logicalTableConfig);
+
+    IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin, 
_clusterName);
+    assertTrue(brokerResource.getPartitionSet().contains(logicalTableName));
+    checkBrokerResourceForPartition(logicalTableName, taggedBrokers);
+
+    // Add a new broker instance with same tenant; verify add-broker path 
updates logical table partition
+    Instance newBrokerInstance =
+        new Instance("localhost", 3, InstanceType.BROKER, 
Collections.singletonList(brokerTag), null, 0, 0, 0, 0,
+            false);
+    assertTrue(_helixResourceManager.addInstance(newBrokerInstance, 
true).isSuccessful());
+    String newBrokerId = InstanceUtils.getHelixInstanceId(newBrokerInstance);
+    List<String> taggedBrokersAfterAdd = new ArrayList<>(taggedBrokers);
+    taggedBrokersAfterAdd.add(newBrokerId);
+
+    checkBrokerResourceForPartition(logicalTableName, taggedBrokersAfterAdd);
+
+    // Cleanup
+    _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+    _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+    assertTrue(_helixResourceManager.dropInstance(newBrokerId).isSuccessful());
+    resetBrokerTags();
+  }
+
   private void checkBrokerResource(List<String> expectedBrokers) {
     IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin, 
_clusterName);
     assertEquals(brokerResource.getPartitionSet().size(), 1);
@@ -333,6 +393,15 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     assertEquals(instanceStateMap.keySet(), new HashSet<>(expectedBrokers));
   }
 
+  private void checkBrokerResourceForPartition(String partitionName, 
List<String> expectedBrokers) {
+    IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin, 
_clusterName);
+    assertTrue(brokerResource.getPartitionSet().contains(partitionName),
+        "Broker resource should contain partition: " + partitionName);
+    Map<String, String> instanceStateMap = 
brokerResource.getInstanceStateMap(partitionName);
+    assertEquals(instanceStateMap.keySet(), new HashSet<>(expectedBrokers),
+        "Broker set for partition " + partitionName + " should match 
expected");
+  }
+
   @Test
   public void testRebuildBrokerResourceFromHelixTags()
       throws Exception {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java
new file mode 100644
index 00000000000..159b57ef6e0
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link BrokerResourceValidationManager}, including that 
getTablesToProcess
+ * returns both physical table names and logical table partition names (Issue 
#15751).
+ */
+public class BrokerResourceValidationManagerTest {
+
+  private static final String PHYSICAL_TABLE = "myTable_OFFLINE";
+  private static final String LOGICAL_TABLE_PARTITION = "my_logical_table";
+
+  private PinotHelixResourceManager _resourceManager;
+  private BrokerResourceValidationManager _validationManager;
+
+  @BeforeMethod
+  public void setUp() {
+    _resourceManager = mock(PinotHelixResourceManager.class);
+    when(_resourceManager.getAllTables()).thenReturn(List.of(PHYSICAL_TABLE));
+    
when(_resourceManager.getBrokerResourceLogicalTables()).thenReturn(List.of(LOGICAL_TABLE_PARTITION));
+
+    ControllerConf config = new ControllerConf();
+    LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
+    ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    _validationManager = new BrokerResourceValidationManager(config, 
_resourceManager, leadControllerManager,
+        controllerMetrics);
+  }
+
+  /**
+   * Verifies that getTablesToProcess returns both physical tables (from 
getAllTables) and
+   * logical table partitions (from getBrokerResourceLogicalTables) so that the
+   * periodic task validates and repairs broker resource for logical tables 
too.
+   */
+  @Test
+  public void testGetTablesToProcessIncludesLogicalTablePartitions() {
+    List<String> tables = _validationManager.getTablesToProcess(new 
Properties());
+
+    assertTrue(tables.contains(PHYSICAL_TABLE), "Should include physical 
table: " + PHYSICAL_TABLE);
+    assertTrue(tables.contains(LOGICAL_TABLE_PARTITION),
+        "Should include logical table partition: " + LOGICAL_TABLE_PARTITION);
+    assertEquals(tables.size(), 2, "Should have exactly physical + logical");
+  }
+
+  @Test
+  public void 
testGetTablesToProcessWithTableNamePropertyReturnsOnlyThatTable() {
+    String singleTable = "singleTable_REALTIME";
+    Properties props = new Properties();
+    props.setProperty(PeriodicTask.PROPERTY_KEY_TABLE_NAME, singleTable);
+
+    List<String> tables = _validationManager.getTablesToProcess(props);
+
+    assertEquals(tables, List.of(singleTable), "When tableName property is 
set, only that table should be returned");
+  }
+
+  @Test
+  public void testGetTablesToProcessWhenNoLogicalPartitions() {
+    
when(_resourceManager.getBrokerResourceLogicalTables()).thenReturn(Collections.emptyList());
+
+    List<String> tables = _validationManager.getTablesToProcess(new 
Properties());
+
+    assertEquals(tables, List.of(PHYSICAL_TABLE));
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java
index f66230c6d12..0d119a6fbd1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java
@@ -18,15 +18,19 @@
  */
 package org.apache.pinot.controller.validation;
 
+import java.util.List;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -100,6 +104,62 @@ public class ValidationManagerStatelessTest extends 
ControllerTest {
         
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
   }
 
+  /**
+   * Verifies that rebuildBrokerResourceFromHelixTags works for a logical 
table partition when a new broker
+   * is added manually and the ideal state is out of sync (Issue #15751).
+   */
+  @Test
+  public void testRebuildBrokerResourceWhenBrokerAddedForLogicalTable()
+      throws Exception {
+    // Add realtime table so we can create a logical table with both offline 
and realtime
+    TableConfig realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+    _helixResourceManager.addTable(realtimeTableConfig);
+
+    String logicalTableName = "test_logical_rebuild";
+    addDummySchema(logicalTableName);
+    List<String> physicalTableNamesWithType =
+        List.of(TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME),
+            TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME));
+    LogicalTableConfig logicalTableConfig =
+        ControllerTest.getDummyLogicalTableConfig(logicalTableName, 
physicalTableNamesWithType,
+            TagNameUtils.DEFAULT_TENANT_NAME);
+    addLogicalTableConfig(logicalTableConfig);
+
+    HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+    IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin, 
getHelixClusterName());
+    Assert.assertTrue(idealState.getPartitionSet().contains(logicalTableName));
+
+    // Add a new broker manually so the logical table partition is missing it
+    final String newBrokerId = "Broker_localhost_3";
+    InstanceConfig instanceConfig = new InstanceConfig(newBrokerId);
+    instanceConfig.setInstanceEnabled(true);
+    instanceConfig.setHostName("Broker_localhost");
+    instanceConfig.setPort("3");
+    helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+    helixAdmin.addInstanceTag(getHelixClusterName(), 
instanceConfig.getInstanceName(),
+        TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+
+    idealState = HelixHelper.getBrokerIdealStates(helixAdmin, 
getHelixClusterName());
+    Assert.assertFalse(idealState.getInstanceSet(logicalTableName)
+        
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+    _helixResourceManager.rebuildBrokerResourceFromHelixTags(logicalTableName);
+    idealState = HelixHelper.getBrokerIdealStates(helixAdmin, 
getHelixClusterName());
+    Assert.assertTrue(idealState.getInstanceSet(logicalTableName)
+        
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+    // Cleanup
+    _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+    _helixResourceManager.deleteRealtimeTable(TEST_TABLE_NAME);
+    // Remove the manually added broker instance so that subsequent tests see 
a clean cluster state
+    helixAdmin.removeInstanceTag(getHelixClusterName(), 
instanceConfig.getInstanceName(),
+        TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+    instanceConfig.setInstanceEnabled(false);
+    helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
+  }
+
   @AfterClass
   public void tearDown() {
     stopFakeInstances();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 74a97f52f64..a606ca1183d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -422,6 +422,56 @@ public class ControllerPeriodicTasksIntegrationTest 
extends BaseClusterIntegrati
     }, 600_000L, "Timeout when waiting for BrokerResourceValidationManager");
   }
 
+  /**
+   * Verifies that BrokerResourceValidationManager also repairs broker 
resource for a logical table
+   * when a new broker is added (Issue #15751).
+   */
+  @Test
+  public void testBrokerResourceValidationManagerRepairsLogicalTable()
+      throws IOException {
+    // Add logical table (same broker tenant as physical table)
+    Schema logicalTableSchema = createSchema();
+    logicalTableSchema.setSchemaName(getLogicalTableName());
+    addSchema(logicalTableSchema);
+    createLogicalTable();
+
+    String helixClusterName = getHelixClusterName();
+    String logicalTableName = getLogicalTableName();
+    IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, 
helixClusterName);
+    assertNotNull(idealState);
+    assertTrue(idealState.getPartitionSet().contains(logicalTableName), 
"Broker resource should have logical table");
+
+    // Add a new broker so logical table partition is out of sync
+    String brokerId = "Broker_localhost_5678";
+    InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(brokerId);
+    instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+    _helixAdmin.addInstance(helixClusterName, instanceConfig);
+    Set<String> brokersAfterAdd = 
_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
+    assertTrue(brokersAfterAdd.contains(brokerId));
+
+    // Assert logical table partition does not yet contain the new broker 
(periodic task will repair it)
+    idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, 
helixClusterName);
+    assertNotNull(idealState);
+    assertFalse(idealState.getInstanceSet(logicalTableName).contains(brokerId),
+        "Logical table partition should not yet include the new broker before 
periodic task runs");
+
+    // Wait for BrokerResourceValidationManager to repair both physical and 
logical table partitions
+    String tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+    TestUtils.waitForCondition(aVoid -> {
+      IdealState is = HelixHelper.getBrokerIdealStates(_helixAdmin, 
helixClusterName);
+      if (is == null) {
+        return false;
+      }
+      return is.getInstanceSet(tableNameWithType).equals(brokersAfterAdd)
+          && is.getInstanceSet(logicalTableName).equals(brokersAfterAdd);
+    }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager to 
repair logical table partition");
+
+    // Cleanup: drop broker, logical table config, and logical table schema
+    _helixAdmin.dropInstance(helixClusterName, instanceConfig);
+    _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+    deleteSchema(logicalTableName);
+  }
+
   @Test
   public void testOfflineSegmentIntervalChecker() {
     OfflineSegmentValidationManager offlineSegmentValidationManager =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to