This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6a52aa07480 Enhance BrokerResourceValidationManager to support logical
table partitions in resource validation (#17842)
6a52aa07480 is described below
commit 6a52aa07480e5c9e6019977263ce76f57e676599
Author: Abhishek Bafna <[email protected]>
AuthorDate: Tue Mar 17 19:12:23 2026 +0530
Enhance BrokerResourceValidationManager to support logical table partitions
in resource validation (#17842)
---
.../pinot/common/utils/helix/HelixHelper.java | 22 +++--
.../helix/core/PinotHelixResourceManager.java | 77 ++++++++++++-----
.../core/periodictask/ControllerPeriodicTask.java | 26 ++++--
.../BrokerResourceValidationManager.java | 34 ++++++--
.../PinotHelixResourceManagerStatelessTest.java | 69 +++++++++++++++
.../BrokerResourceValidationManagerTest.java | 98 ++++++++++++++++++++++
.../validation/ValidationManagerStatelessTest.java | 60 +++++++++++++
.../ControllerPeriodicTasksIntegrationTest.java | 50 +++++++++++
8 files changed, 392 insertions(+), 44 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index a052e6bee75..03955696130 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -40,6 +40,7 @@ import
org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.pinot.common.helix.ExtraInstanceConfig;
@@ -48,6 +49,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.version.PinotVersion;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
@@ -534,24 +536,26 @@ public class HelixHelper {
}
public static Set<String> getTablesForBrokerTag(HelixManager helixManager,
String brokerTag) {
- Set<String> tablesForBrokerTag = new HashSet<>();
- List<TableConfig> tableConfigs =
ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore());
- for (TableConfig tableConfig : tableConfigs) {
- if
(TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()).equals(brokerTag))
{
- tablesForBrokerTag.add(tableConfig.getTableName());
- }
- }
- return tablesForBrokerTag;
+ return getTablesForBrokerTags(helixManager, List.of(brokerTag));
}
public static Set<String> getTablesForBrokerTags(HelixManager helixManager,
List<String> brokerTags) {
Set<String> tablesForBrokerTags = new HashSet<>();
- List<TableConfig> tableConfigs =
ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore());
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
helixManager.getHelixPropertyStore();
+ List<TableConfig> tableConfigs =
ZKMetadataProvider.getAllTableConfigs(propertyStore);
for (TableConfig tableConfig : tableConfigs) {
if
(brokerTags.contains(TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker())))
{
tablesForBrokerTags.add(tableConfig.getTableName());
}
}
+ // Include logical tables that use any of these broker tenants
+ for (LogicalTableConfig logicalTableConfig :
ZKMetadataProvider.getAllLogicalTableConfigs(propertyStore)) {
+ String logicalBrokerTag =
+
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant());
+ if (brokerTags.contains(logicalBrokerTag)) {
+ tablesForBrokerTags.add(logicalTableConfig.getTableName());
+ }
+ }
return tablesForBrokerTags;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index bca909d70dd..85cb08c1cdf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -748,6 +748,14 @@ public class PinotHelixResourceManager {
return
getAllResources().stream().filter(TableNameBuilder::isTableResource).collect(Collectors.toList());
}
+ /**
+ * Returns all logical table names in the cluster. Used by broker resource
validation to repair logical table
+ * broker assignments (and to add missing logical tables to the broker
resource when needed).
+ */
+ public List<String> getBrokerResourceLogicalTables() {
+ return getAllLogicalTableNames();
+ }
+
/**
* Get all table names (with type suffix) from provided database.
*
@@ -1101,22 +1109,25 @@ public class PinotHelixResourceManager {
}
public PinotResourceManagerResponse
rebuildBrokerResourceFromHelixTags(String tableNameWithType)
- throws Exception {
- TableConfig tableConfig;
+ throws Exception {
+ Set<String> brokerInstances;
try {
- tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore,
tableNameWithType);
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ if (tableConfig != null) {
+ brokerInstances =
getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker());
+ } else {
+ LogicalTableConfig logicalTableConfig =
+ ZKMetadataProvider.getLogicalTableConfig(_propertyStore,
tableNameWithType);
+ Preconditions.checkNotNull(logicalTableConfig, "No table config or
logical table config found for %s",
+ tableNameWithType);
+ brokerInstances =
getAllInstancesForBrokerTenant(logicalTableConfig.getBrokerTenant());
+ }
} catch (Exception e) {
- LOGGER.warn("Caught exception while getting table config for table {}",
tableNameWithType, e);
- throw new InvalidTableConfigException(
- "Failed to fetch broker tag for table " + tableNameWithType + " due
to exception: " + e.getMessage());
- }
- if (tableConfig == null) {
- LOGGER.warn("Table {} does not exist", tableNameWithType);
+ LOGGER.warn("Caught exception while getting config for table {}",
tableNameWithType, e);
throw new InvalidConfigException(
- "Invalid table configuration for table " + tableNameWithType + ".
Table does not exist");
+ "Failed to fetch broker config for table " + tableNameWithType + "
due to exception: " + e.getMessage());
}
- return rebuildBrokerResource(tableNameWithType,
-
getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker()));
+ return rebuildBrokerResource(tableNameWithType, brokerInstances);
}
public PinotResourceManagerResponse rebuildBrokerResource(String
tableNameWithType, Set<String> brokerInstances) {
@@ -1150,16 +1161,37 @@ public class PinotHelixResourceManager {
}
private void addInstanceToBrokerIdealState(String brokerTenantTag, String
instanceName) {
- IdealState tableIdealState =
_helixAdmin.getResourceIdealState(_helixClusterName,
Helix.BROKER_RESOURCE_INSTANCE);
- for (String tableNameWithType : tableIdealState.getPartitionSet()) {
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- Preconditions.checkNotNull(tableConfig);
- String brokerTag =
TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
- if (brokerTag.equals(brokerTenantTag)) {
- tableIdealState.setPartitionState(tableNameWithType, instanceName,
BrokerResourceStateModel.ONLINE);
+ // Use atomic read-modify-write so updates (including for logical tables)
are persisted and not lost to races.
+ HelixHelper.updateIdealState(getHelixZkManager(),
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
+ Preconditions.checkNotNull(idealState, "Broker ideal state must not be
null");
+ for (String partitionName : idealState.getPartitionSet()) {
+ String brokerTag = resolveBrokerTagForTable(partitionName);
+ if (brokerTag.equals(brokerTenantTag)) {
+ idealState.setPartitionState(partitionName, instanceName,
BrokerResourceStateModel.ONLINE);
+ }
}
+ return idealState;
+ }, DEFAULT_RETRY_POLICY);
+ }
+
+ /**
+ * Resolves the broker tag for a table in the broker resource. Tries
physical table config first,
+ * then logical table config.
+ *
+ * @param tableName table name in broker ideal state (physical table name
with type or logical table name)
+ * @return broker tag for the table, or throw exception if the table name
cannot be resolved
+ */
+ private String resolveBrokerTagForTable(String tableName) {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableName);
+ if (tableConfig != null) {
+ return TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
}
- _helixAdmin.setResourceIdealState(_helixClusterName,
Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+ LogicalTableConfig logicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+ if (logicalTableConfig != null) {
+ return
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant());
+ }
+ throw new InvalidTableConfigException("Failed to resolve broker tag for
table " + tableName
+ + " because no table config or logical table config found");
}
private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String
brokerTenantTag,
@@ -1886,7 +1918,7 @@ public class PinotHelixResourceManager {
// Add ideal state with consuming segments from designated stream
metadata
_pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState,
streamMetadataList);
LOGGER.info("Adding table {}: Added consuming segments ideal state
given the designated stream metadata",
- tableNameWithType);
+ tableNameWithType);
}
} catch (Exception e) {
LOGGER.error("Caught exception while setting up table: {}, cleaning it
up", tableNameWithType, e);
@@ -4996,7 +5028,8 @@ public class PinotHelixResourceManager {
* @throws TableNotFoundException if the specified real-time table does not
exist.
* @throws IllegalStateException if the IdealState for the table is not
found.
*/
- public WatermarkInductionResult getConsumerWatermarks(String tableName)
throws TableNotFoundException {
+ public WatermarkInductionResult getConsumerWatermarks(String tableName)
+ throws TableNotFoundException {
String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
if (!hasRealtimeTable(tableName)) {
throw new TableNotFoundException("Table " + tableNameWithType + " does
not exist");
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index e49266f429b..7d2accc9934 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -64,18 +64,32 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask impleme
_controllerMetrics = controllerMetrics;
}
+ /**
+ * Returns the list of table names (with type) to consider for this task.
Subclasses may override to add
+ * more names (e.g. logical table partitions). Default: single table from
property or all physical tables.
+ */
+ protected List<String> getTablesToProcess(Properties periodicTaskProperties)
{
+ String propTableNameWithType = (String)
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
+ return propTableNameWithType != null ? List.of(propTableNameWithType)
+ : _pinotHelixResourceManager.getAllTables();
+ }
+
+ /**
+ * Returns whether the given table should be processed by this controller
run. Default: only if this
+ * controller is leader for the table. Subclasses may override (e.g. to
always include logical tables).
+ */
+ protected boolean shouldProcessTable(String tableNameWithType) {
+ return _leadControllerManager.isLeaderForTable(tableNameWithType);
+ }
+
@Override
protected final void runTask(Properties periodicTaskProperties) {
_controllerMetrics.addMeteredTableValue(_taskName,
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
try {
- // Check if we have a specific table against which this task needs to be
run.
- String propTableNameWithType = (String)
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
- // Process the tables that are managed by this controller
- List<String> allTables =
- propTableNameWithType != null ? List.of(propTableNameWithType) :
_pinotHelixResourceManager.getAllTables();
+ List<String> allTables = getTablesToProcess(periodicTaskProperties);
Set<String> currentLeaderOfTables = allTables.stream()
- .filter(_leadControllerManager::isLeaderForTable)
+ .filter(this::shouldProcessTable)
.collect(Collectors.toSet());
if (!currentLeaderOfTables.isEmpty()) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 858b3cae46a..7f0d1dc1564 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.validation;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -27,7 +28,9 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTask;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +48,17 @@ public class BrokerResourceValidationManager extends
ControllerPeriodicTask<Brok
controllerMetrics);
}
+ @Override
+ protected List<String> getTablesToProcess(Properties periodicTaskProperties)
{
+ List<String> tables = super.getTablesToProcess(periodicTaskProperties);
+ if (periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME) !=
null) {
+ return tables;
+ }
+ List<String> combined = new ArrayList<>(tables);
+
combined.addAll(_pinotHelixResourceManager.getBrokerResourceLogicalTables());
+ return combined;
+ }
+
@Override
protected Context preprocess(Properties periodicTaskProperties) {
Context context = new Context();
@@ -55,15 +69,21 @@ public class BrokerResourceValidationManager extends
ControllerPeriodicTask<Brok
@Override
protected void processTable(String tableNameWithType, Context context) {
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping broker
resource validation", tableNameWithType);
+ if (tableConfig != null) {
+ Set<String> brokerInstances = _pinotHelixResourceManager
+ .getAllInstancesForBrokerTenant(context._instanceConfigs,
tableConfig.getTenantConfig().getBroker());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
return;
}
-
- // Rebuild broker resource
- Set<String> brokerInstances = _pinotHelixResourceManager
- .getAllInstancesForBrokerTenant(context._instanceConfigs,
tableConfig.getTenantConfig().getBroker());
- _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
+ LogicalTableConfig logicalTableConfig =
_pinotHelixResourceManager.getLogicalTableConfig(tableNameWithType);
+ if (logicalTableConfig != null) {
+ Set<String> brokerInstances = _pinotHelixResourceManager
+ .getAllInstancesForBrokerTenant(context._instanceConfigs,
logicalTableConfig.getBrokerTenant());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
+ return;
+ }
+ LOGGER.warn("No table config or logical table config found for: {},
skipping broker resource validation",
+ tableNameWithType);
}
public static final class Context {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index e6d673b4273..bc643b8c92e 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -78,6 +78,7 @@ import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -326,6 +327,65 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
resetBrokerTags();
}
+ /**
+ * Verifies that when a new broker is added with the same tenant as a
logical table,
+ * the broker resource ideal state is updated for the logical table
partition (Issue #15751).
+ */
+ @Test
+ public void testUpdateBrokerResourceWithLogicalTable()
+ throws Exception {
+ untagBrokers();
+ Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 2,
0, 0);
+ _helixResourceManager.createBrokerTenant(brokerTenant);
+
+ String brokerTag = TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME);
+ List<InstanceConfig> instanceConfigs =
HelixHelper.getInstanceConfigs(_helixManager);
+ List<String> taggedBrokers =
HelixHelper.getInstancesWithTag(instanceConfigs, brokerTag);
+ assertEquals(taggedBrokers.size(), 2);
+
+ // Add physical tables (offline + realtime) via manager with test tenants,
then logical table
+ addDummySchema(RAW_TABLE_NAME);
+ TableConfig offlineTableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
+ .setServerTenant(SERVER_TENANT_NAME).build();
+ waitForEVToDisappear(offlineTableConfig.getTableName());
+ _helixResourceManager.addTable(offlineTableConfig);
+ waitForEVToDisappear(REALTIME_TABLE_NAME);
+ TableConfig realtimeTableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
+ .setServerTenant(SERVER_TENANT_NAME)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+ _helixResourceManager.addTable(realtimeTableConfig);
+ List<String> physicalTableNamesWithType = List.of(OFFLINE_TABLE_NAME,
REALTIME_TABLE_NAME);
+ String logicalTableName = "test_logical_table";
+ addDummySchema(logicalTableName);
+ LogicalTableConfig logicalTableConfig =
+ ControllerTest.getDummyLogicalTableConfig(logicalTableName,
physicalTableNamesWithType, BROKER_TENANT_NAME);
+ _helixResourceManager.addLogicalTableConfig(logicalTableConfig);
+
+ IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin,
_clusterName);
+ assertTrue(brokerResource.getPartitionSet().contains(logicalTableName));
+ checkBrokerResourceForPartition(logicalTableName, taggedBrokers);
+
+ // Add a new broker instance with same tenant; verify add-broker path
updates logical table partition
+ Instance newBrokerInstance =
+ new Instance("localhost", 3, InstanceType.BROKER,
Collections.singletonList(brokerTag), null, 0, 0, 0, 0,
+ false);
+ assertTrue(_helixResourceManager.addInstance(newBrokerInstance,
true).isSuccessful());
+ String newBrokerId = InstanceUtils.getHelixInstanceId(newBrokerInstance);
+ List<String> taggedBrokersAfterAdd = new ArrayList<>(taggedBrokers);
+ taggedBrokersAfterAdd.add(newBrokerId);
+
+ checkBrokerResourceForPartition(logicalTableName, taggedBrokersAfterAdd);
+
+ // Cleanup
+ _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+ _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+ assertTrue(_helixResourceManager.dropInstance(newBrokerId).isSuccessful());
+ resetBrokerTags();
+ }
+
private void checkBrokerResource(List<String> expectedBrokers) {
IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin,
_clusterName);
assertEquals(brokerResource.getPartitionSet().size(), 1);
@@ -333,6 +393,15 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
assertEquals(instanceStateMap.keySet(), new HashSet<>(expectedBrokers));
}
+ private void checkBrokerResourceForPartition(String partitionName,
List<String> expectedBrokers) {
+ IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin,
_clusterName);
+ assertTrue(brokerResource.getPartitionSet().contains(partitionName),
+ "Broker resource should contain partition: " + partitionName);
+ Map<String, String> instanceStateMap =
brokerResource.getInstanceStateMap(partitionName);
+ assertEquals(instanceStateMap.keySet(), new HashSet<>(expectedBrokers),
+ "Broker set for partition " + partitionName + " should match
expected");
+ }
+
@Test
public void testRebuildBrokerResourceFromHelixTags()
throws Exception {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java
new file mode 100644
index 00000000000..159b57ef6e0
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link BrokerResourceValidationManager}, including that
getTablesToProcess
+ * returns both physical table names and logical table partition names (Issue
#15751).
+ */
+public class BrokerResourceValidationManagerTest {
+
+ private static final String PHYSICAL_TABLE = "myTable_OFFLINE";
+ private static final String LOGICAL_TABLE_PARTITION = "my_logical_table";
+
+ private PinotHelixResourceManager _resourceManager;
+ private BrokerResourceValidationManager _validationManager;
+
+ @BeforeMethod
+ public void setUp() {
+ _resourceManager = mock(PinotHelixResourceManager.class);
+ when(_resourceManager.getAllTables()).thenReturn(List.of(PHYSICAL_TABLE));
+
when(_resourceManager.getBrokerResourceLogicalTables()).thenReturn(List.of(LOGICAL_TABLE_PARTITION));
+
+ ControllerConf config = new ControllerConf();
+ LeadControllerManager leadControllerManager =
mock(LeadControllerManager.class);
+ ControllerMetrics controllerMetrics = new
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+ _validationManager = new BrokerResourceValidationManager(config,
_resourceManager, leadControllerManager,
+ controllerMetrics);
+ }
+
+ /**
+ * Verifies that getTablesToProcess returns both physical tables (from
getAllTables) and
+ * logical table partitions (from getBrokerResourceLogicalTables) so that the
+ * periodic task validates and repairs broker resource for logical tables
too.
+ */
+ @Test
+ public void testGetTablesToProcessIncludesLogicalTablePartitions() {
+ List<String> tables = _validationManager.getTablesToProcess(new
Properties());
+
+ assertTrue(tables.contains(PHYSICAL_TABLE), "Should include physical
table: " + PHYSICAL_TABLE);
+ assertTrue(tables.contains(LOGICAL_TABLE_PARTITION),
+ "Should include logical table partition: " + LOGICAL_TABLE_PARTITION);
+ assertEquals(tables.size(), 2, "Should have exactly physical + logical");
+ }
+
+ @Test
+ public void
testGetTablesToProcessWithTableNamePropertyReturnsOnlyThatTable() {
+ String singleTable = "singleTable_REALTIME";
+ Properties props = new Properties();
+ props.setProperty(PeriodicTask.PROPERTY_KEY_TABLE_NAME, singleTable);
+
+ List<String> tables = _validationManager.getTablesToProcess(props);
+
+ assertEquals(tables, List.of(singleTable), "When tableName property is
set, only that table should be returned");
+ }
+
+ @Test
+ public void testGetTablesToProcessWhenNoLogicalPartitions() {
+
when(_resourceManager.getBrokerResourceLogicalTables()).thenReturn(Collections.emptyList());
+
+ List<String> tables = _validationManager.getTablesToProcess(new
Properties());
+
+ assertEquals(tables, List.of(PHYSICAL_TABLE));
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java
index f66230c6d12..0d119a6fbd1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java
@@ -18,15 +18,19 @@
*/
package org.apache.pinot.controller.validation;
+import java.util.List;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -100,6 +104,62 @@ public class ValidationManagerStatelessTest extends
ControllerTest {
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
}
+ /**
+ * Verifies that rebuildBrokerResourceFromHelixTags works for a logical
table partition when a new broker
+ * is added manually and the ideal state is out of sync (Issue #15751).
+ */
+ @Test
+ public void testRebuildBrokerResourceWhenBrokerAddedForLogicalTable()
+ throws Exception {
+ // Add realtime table so we can create a logical table with both offline
and realtime
+ TableConfig realtimeTableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+ _helixResourceManager.addTable(realtimeTableConfig);
+
+ String logicalTableName = "test_logical_rebuild";
+ addDummySchema(logicalTableName);
+ List<String> physicalTableNamesWithType =
+ List.of(TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME),
+ TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME));
+ LogicalTableConfig logicalTableConfig =
+ ControllerTest.getDummyLogicalTableConfig(logicalTableName,
physicalTableNamesWithType,
+ TagNameUtils.DEFAULT_TENANT_NAME);
+ addLogicalTableConfig(logicalTableConfig);
+
+ HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+ IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertTrue(idealState.getPartitionSet().contains(logicalTableName));
+
+ // Add a new broker manually so the logical table partition is missing it
+ final String newBrokerId = "Broker_localhost_3";
+ InstanceConfig instanceConfig = new InstanceConfig(newBrokerId);
+ instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setHostName("Broker_localhost");
+ instanceConfig.setPort("3");
+ helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+ helixAdmin.addInstanceTag(getHelixClusterName(),
instanceConfig.getInstanceName(),
+ TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+
+ idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertFalse(idealState.getInstanceSet(logicalTableName)
+
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+ _helixResourceManager.rebuildBrokerResourceFromHelixTags(logicalTableName);
+ idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertTrue(idealState.getInstanceSet(logicalTableName)
+
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+ // Cleanup
+ _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+ _helixResourceManager.deleteRealtimeTable(TEST_TABLE_NAME);
+ // Remove the manually added broker instance so that subsequent tests see
a clean cluster state
+ helixAdmin.removeInstanceTag(getHelixClusterName(),
instanceConfig.getInstanceName(),
+ TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+ instanceConfig.setInstanceEnabled(false);
+ helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
+ }
+
@AfterClass
public void tearDown() {
stopFakeInstances();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 74a97f52f64..a606ca1183d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -422,6 +422,56 @@ public class ControllerPeriodicTasksIntegrationTest
extends BaseClusterIntegrati
}, 600_000L, "Timeout when waiting for BrokerResourceValidationManager");
}
+ /**
+ * Verifies that BrokerResourceValidationManager also repairs broker
resource for a logical table
+ * when a new broker is added (Issue #15751).
+ */
+ @Test
+ public void testBrokerResourceValidationManagerRepairsLogicalTable()
+ throws IOException {
+ // Add logical table (same broker tenant as physical table)
+ Schema logicalTableSchema = createSchema();
+ logicalTableSchema.setSchemaName(getLogicalTableName());
+ addSchema(logicalTableSchema);
+ createLogicalTable();
+
+ String helixClusterName = getHelixClusterName();
+ String logicalTableName = getLogicalTableName();
+ IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ assertNotNull(idealState);
+ assertTrue(idealState.getPartitionSet().contains(logicalTableName),
"Broker resource should have logical table");
+
+ // Add a new broker so logical table partition is out of sync
+ String brokerId = "Broker_localhost_5678";
+ InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(brokerId);
+ instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+ _helixAdmin.addInstance(helixClusterName, instanceConfig);
+ Set<String> brokersAfterAdd =
_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
+ assertTrue(brokersAfterAdd.contains(brokerId));
+
+ // Assert logical table partition does not yet contain the new broker
(periodic task will repair it)
+ idealState = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ assertNotNull(idealState);
+ assertFalse(idealState.getInstanceSet(logicalTableName).contains(brokerId),
+ "Logical table partition should not yet include the new broker before
periodic task runs");
+
+ // Wait for BrokerResourceValidationManager to repair both physical and
logical table partitions
+ String tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+ TestUtils.waitForCondition(aVoid -> {
+ IdealState is = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ if (is == null) {
+ return false;
+ }
+ return is.getInstanceSet(tableNameWithType).equals(brokersAfterAdd)
+ && is.getInstanceSet(logicalTableName).equals(brokersAfterAdd);
+ }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager to
repair logical table partition");
+
+ // Cleanup: drop broker, logical table config, and logical table schema
+ _helixAdmin.dropInstance(helixClusterName, instanceConfig);
+ _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+ deleteSchema(logicalTableName);
+ }
+
@Test
public void testOfflineSegmentIntervalChecker() {
OfflineSegmentValidationManager offlineSegmentValidationManager =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]