This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ec90dc16f1e Logical tables / Table Config validation refactoring
(#17761)
ec90dc16f1e is described below
commit ec90dc16f1e77046b294ce3c0f25b6d554f72f20
Author: Krishan Goyal <[email protected]>
AuthorDate: Fri Feb 27 15:21:14 2026 +0530
Logical tables / Table Config validation refactoring (#17761)
* Move ZNRecord SerDe into TableConfig and LogicalTableConfig
Move fromZNRecord/toZNRecord methods from TableConfigSerDeUtils and
LogicalTableConfigUtils into the config classes themselves. This makes
it easier to extend these classes — subclasses can override
serialization/deserialization behavior directly.
- Add helix-core dependency to pinot-spi for ZNRecord access
- Add fromZNRecord()/toZNRecord() to TableConfig and LogicalTableConfig
- Delete TableConfigSerDeUtils (all callers updated)
- Remove fromZNRecord/toZNRecord from LogicalTableConfigUtils (validation
stays)
- Update all callers across 15 files to use the new methods directly
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Spotless fixes
* Introduce ConfigRecord to decouple pinot-spi from helix-core
Replace ZNRecord-based SerDe on TableConfig and LogicalTableConfig with
a helix-agnostic ConfigRecord POJO in pinot-spi. Bridge utilities in
pinot-common (TableConfigSerDeUtils, LogicalTableConfigUtils) handle
ZNRecord conversion, keeping helix-core out of the SPI layer.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Spotless fixes
* Code cleanup
* Refactor fromConfigRecord to make it more extendible
* add javadoc
* Initial refactor of logical table / table validations
* Initial refactor of logical table / table validations
* Make logical table ser/de pluggable
* checkstyle fixes
* Cleanup code
* Move broker, tenant, and schema validation to resource manager.
* Cleanup dead code
* Test case fixes
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../common/utils/LogicalTableConfigUtils.java | 28 +-------
.../api/resources/TableConfigValidationUtils.java | 2 +
.../helix/core/PinotHelixResourceManager.java | 82 +++++++++++++++-------
.../resources/PinotLogicalTableResourceTest.java | 4 +-
.../java/org/apache/pinot/spi/data/Schema.java | 11 +++
5 files changed, 75 insertions(+), 52 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index edf6b038aff..1f319981e95 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -23,13 +23,10 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.ws.rs.core.HttpHeaders;
import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.LogicalTableConfig;
@@ -54,11 +51,7 @@ public class LogicalTableConfigUtils {
return
LogicalTableConfigSerDeProvider.getInstance().toZNRecord(logicalTableConfig);
}
- public static void validateLogicalTableConfig(
- LogicalTableConfig logicalTableConfig,
- Predicate<String> physicalTableExistsPredicate,
- Predicate<String> brokerTenantExistsPredicate,
- ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ public static void validateLogicalTableConfig(LogicalTableConfig
logicalTableConfig) {
String tableName = logicalTableConfig.getTableName();
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Invalid logical table name. Reason:
'tableName' should not be null or empty");
@@ -100,12 +93,6 @@ public class LogicalTableConfigUtils {
+ "' should have the same database name as logical table: "
+ databaseName + " != "
+ physicalTableDatabaseName);
}
-
- // validate physical table exists
- if (!physicalTableExistsPredicate.test(physicalTableName)) {
- throw new IllegalArgumentException(
- "Invalid logical table. Reason: '" + physicalTableName + "'
should be one of the existing tables");
- }
}
if (TableNameBuilder.isOfflineTableResource(physicalTableName)) {
@@ -175,19 +162,6 @@ public class LogicalTableConfigUtils {
"Invalid logical table. Reason: 'quota.storage' should not be set
for logical table");
}
- // validate broker tenant exists
- String brokerTenant = logicalTableConfig.getBrokerTenant();
- if (!brokerTenantExistsPredicate.test(brokerTenant)) {
- throw new IllegalArgumentException(
- "Invalid logical table. Reason: '" + brokerTenant + "' should be one
of the existing broker tenants");
- }
-
- // Validate schema with same name as logical table exists
- if (!ZKMetadataProvider.isSchemaExists(propertyStore, tableName)) {
- throw new IllegalArgumentException(
- "Invalid logical table. Reason: Schema with same name as logical
table '" + tableName + "' does not exist");
- }
-
// validate time boundary config is not null for hybrid tables
TimeBoundaryConfig timeBoundaryConfig =
logicalTableConfig.getTimeBoundaryConfig();
if (logicalTableConfig.isHybridLogicalTable() && timeBoundaryConfig ==
null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigValidationUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigValidationUtils.java
index 1b3d1c0cfe7..72e93e6d048 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigValidationUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigValidationUtils.java
@@ -63,6 +63,8 @@ public final class TableConfigValidationUtils {
checkHybridTableConfig(resourceManager, tableConfig);
TaskConfigUtils.validateTaskConfigs(tableConfig, schema, taskManager,
typesToSkip);
validateInstanceAssignment(resourceManager, tableConfig);
+ resourceManager.validateTableTenantConfig(tableConfig);
+ resourceManager.validateTableTaskMinionInstanceTagConfig(tableConfig);
}
private static void checkHybridTableConfig(PinotHelixResourceManager
resourceManager, TableConfig tableConfig) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7caa81d9d79..d0f204087b0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -174,6 +174,7 @@ import org.apache.pinot.spi.controller.ControllerJobType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -1901,20 +1902,12 @@ public class PinotHelixResourceManager {
String tableName = logicalTableConfig.getTableName();
LOGGER.info("Adding logical table {}: Start", tableName);
- validateLogicalTableConfig(logicalTableConfig);
-
- // Check if the logical table name is already used
- if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
- throw new TableAlreadyExistsException("Logical table: " + tableName + "
already exists");
+ if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+ logicalTableConfig.setBrokerTenant("DefaultTenant");
}
- // Check if the table name is already used by a physical table
- PinotHelixPropertyStoreZnRecordProvider
pinotHelixPropertyStoreZnRecordProvider =
- PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore);
- if
(pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
- ||
pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.REALTIME.tableNameWithType(tableName)))
{
- throw new TableAlreadyExistsException("Table name: " + tableName + "
already exists");
- }
+ validateNewLogicalTableConfig(logicalTableConfig);
+ validatePhysicalTablesExist(logicalTableConfig);
LOGGER.info("Adding logical table {}: Creating logical table config in the
property store", tableName);
ZKMetadataProvider.setLogicalTableConfig(_propertyStore,
logicalTableConfig);
@@ -1932,7 +1925,7 @@ public class PinotHelixResourceManager {
* these parameters must be specified in the table config.
*/
@VisibleForTesting
- void validateTableTenantConfig(TableConfig tableConfig) {
+ public void validateTableTenantConfig(TableConfig tableConfig) {
TenantConfig tenantConfig = tableConfig.getTenantConfig();
String tableNameWithType = tableConfig.getTableName();
String brokerTag = tenantConfig.getBroker();
@@ -2001,7 +1994,7 @@ public class PinotHelixResourceManager {
* The validation will run only when the task is set to be scheduled (has
the schedule config param set).
*/
@VisibleForTesting
- void validateTableTaskMinionInstanceTagConfig(TableConfig tableConfig) {
+ public void validateTableTaskMinionInstanceTagConfig(TableConfig
tableConfig) {
List<InstanceConfig> allMinionWorkerInstanceConfigs =
getAllMinionInstanceConfigs();
@@ -2161,7 +2154,8 @@ public class PinotHelixResourceManager {
}
/**
- * Sets the given table config into zookeeper
+ * Sets the given table config into zookeeper bypassing validations in
updateTableConfig
+ * TODO - Make this private and always use updateTableConfig ?
*/
public void setExistingTableConfig(TableConfig tableConfig)
throws IOException {
@@ -2178,7 +2172,12 @@ public class PinotHelixResourceManager {
String tableName = logicalTableConfig.getTableName();
LOGGER.info("Updating logical table {}: Start", tableName);
+ if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+ logicalTableConfig.setBrokerTenant("DefaultTenant");
+ }
+
validateLogicalTableConfig(logicalTableConfig);
+ validatePhysicalTablesExist(logicalTableConfig);
LogicalTableConfig oldLogicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
if (oldLogicalTableConfig == null) {
@@ -2198,6 +2197,21 @@ public class PinotHelixResourceManager {
LOGGER.info("Updated logical table {}: Successfully updated table",
tableName);
}
+ public void validatePhysicalTablesExist(LogicalTableConfig
logicalTableConfig) {
+ for (Map.Entry<String, PhysicalTableConfig> entry :
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
+ PhysicalTableConfig physicalTableConfig = entry.getValue();
+ String physicalTableName = entry.getKey();
+ // Skip existence validation for multi-cluster physical tables
+ if (!physicalTableConfig.isMultiCluster()) {
+ // validate physical table exists
+ if
(!PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore).exist(physicalTableName))
{
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: '" + physicalTableName + "'
should be one of the existing tables");
+ }
+ }
+ }
+ }
+
private void updateBrokerResourceForLogicalTable(LogicalTableConfig
logicalTableConfig, String tableName) {
List<String> brokers = HelixHelper.getInstancesWithTag(
_helixZkManager,
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant()));
@@ -2209,17 +2223,37 @@ public class PinotHelixResourceManager {
});
}
- private void validateLogicalTableConfig(LogicalTableConfig
logicalTableConfig) {
- if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
- logicalTableConfig.setBrokerTenant("DefaultTenant");
+ public void validateNewLogicalTableConfig(LogicalTableConfig
logicalTableConfig) {
+ String tableName = logicalTableConfig.getTableName();
+ validateLogicalTableConfig(logicalTableConfig);
+ // Check if the logical table name is already used
+ if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
+ throw new TableAlreadyExistsException("Logical table: " + tableName + "
already exists");
}
- LogicalTableConfigUtils.validateLogicalTableConfig(
- logicalTableConfig,
-
PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist,
- getAllBrokerTenantNames()::contains,
- _propertyStore
- );
+ // Check if the table name is already used by a physical table
+ PinotHelixPropertyStoreZnRecordProvider
pinotHelixPropertyStoreZnRecordProvider =
+ PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore);
+ if
(pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
+ ||
pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.REALTIME.tableNameWithType(tableName)))
{
+ throw new TableAlreadyExistsException("Table name: " + tableName + "
already exists");
+ }
+ }
+
+ public void validateLogicalTableConfig(LogicalTableConfig
logicalTableConfig) {
+ LogicalTableConfigUtils.validateLogicalTableConfig(logicalTableConfig);
+ // validate broker tenant exists
+ String brokerTenant = logicalTableConfig.getBrokerTenant();
+ if (!getAllBrokerTenantNames().contains(brokerTenant)) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: '" + brokerTenant + "' should be one
of the existing broker tenants");
+ }
+ // Validate schema with same name as logical table exists
+ if (!ZKMetadataProvider.isSchemaExists(_propertyStore,
logicalTableConfig.getTableName())) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: Schema with same name as logical
table '"
+ + logicalTableConfig.getTableName() + "' does not exist");
+ }
}
/**
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
index f65c4a98b34..5d39becb763 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -335,7 +335,8 @@ public class PinotLogicalTableResourceTest extends
ControllerTest {
}
@Test
- public void testLogicalTablePhysicalTableConfigValidation() {
+ public void testLogicalTablePhysicalTableConfigValidation()
+ throws IOException {
// Test empty physical table names is not allowed
Throwable throwable = expectThrows(IOException.class, () -> {
LogicalTableConfig tableConfig =
getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(), BROKER_TENANT);
@@ -345,6 +346,7 @@ public class PinotLogicalTableResourceTest extends
ControllerTest {
throwable.getMessage());
// Test all table names are physical table names and none is hybrid table
name
+ addDummySchema(LOGICAL_TABLE_NAME);
throwable = expectThrows(IOException.class, () -> {
List<String> physicalTableNames = List.of("test_table_1");
LogicalTableConfig logicalTableConfig =
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index d82f32ce208..57e2f9f27ca 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -889,6 +889,17 @@ public final class Schema implements Serializable {
return cloned;
}
+ public static Schema cloneSchemaWithName(Schema source, String newName) {
+ try {
+ String json = JsonUtils.objectToString(source);
+ Schema cloned = JsonUtils.stringToObject(json, Schema.class);
+ cloned.setSchemaName(newName);
+ return cloned;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to clone schema", e);
+ }
+ }
+
/**
* Helper method that converts a {@link TimeFieldSpec} to {@link
DateTimeFieldSpec}
* 1) If timeFieldSpec contains only incoming granularity spec, directly
convert it to a dateTimeFieldSpec
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]