This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ec90dc16f1e Logical tables / Table Config validation refactoring 
(#17761)
ec90dc16f1e is described below

commit ec90dc16f1e77046b294ce3c0f25b6d554f72f20
Author: Krishan Goyal <[email protected]>
AuthorDate: Fri Feb 27 15:21:14 2026 +0530

    Logical tables / Table Config validation refactoring (#17761)
    
    * Move ZNRecord SerDe into TableConfig and LogicalTableConfig
    
    Move fromZNRecord/toZNRecord methods from TableConfigSerDeUtils and
    LogicalTableConfigUtils into the config classes themselves. This makes
    it easier to extend these classes — subclasses can override
    serialization/deserialization behavior directly.
    
    - Add helix-core dependency to pinot-spi for ZNRecord access
    - Add fromZNRecord()/toZNRecord() to TableConfig and LogicalTableConfig
    - Delete TableConfigSerDeUtils (all callers updated)
    - Remove fromZNRecord/toZNRecord from LogicalTableConfigUtils (validation 
stays)
    - Update all callers across 15 files to use the new methods directly
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Spotless fixes
    
    * Introduce ConfigRecord to decouple pinot-spi from helix-core
    
    Replace ZNRecord-based SerDe on TableConfig and LogicalTableConfig with
    a helix-agnostic ConfigRecord POJO in pinot-spi. Bridge utilities in
    pinot-common (TableConfigSerDeUtils, LogicalTableConfigUtils) handle
    ZNRecord conversion, keeping helix-core out of the SPI layer.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Spotless fixes
    
    * Code cleanup
    
    * Refactor fromConfigRecord to make it more extendible
    
    * add javadoc
    
    * Initial refactor of logical table / table validations
    
    * Initial refactor of logical table / table validations
    
    * Make logical table ser/de pluggable
    
    * checkstyle fixes
    
    * Cleanup code
    
    * Move broker, tenant, and schema validation to resource manager.
    
    * Cleanup dead code
    
    * Test case fixes
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../common/utils/LogicalTableConfigUtils.java      | 28 +-------
 .../api/resources/TableConfigValidationUtils.java  |  2 +
 .../helix/core/PinotHelixResourceManager.java      | 82 +++++++++++++++-------
 .../resources/PinotLogicalTableResourceTest.java   |  4 +-
 .../java/org/apache/pinot/spi/data/Schema.java     | 11 +++
 5 files changed, 75 insertions(+), 52 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index edf6b038aff..1f319981e95 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -23,13 +23,10 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.ws.rs.core.HttpHeaders;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
@@ -54,11 +51,7 @@ public class LogicalTableConfigUtils {
     return 
LogicalTableConfigSerDeProvider.getInstance().toZNRecord(logicalTableConfig);
   }
 
-  public static void validateLogicalTableConfig(
-      LogicalTableConfig logicalTableConfig,
-      Predicate<String> physicalTableExistsPredicate,
-      Predicate<String> brokerTenantExistsPredicate,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public static void validateLogicalTableConfig(LogicalTableConfig 
logicalTableConfig) {
     String tableName = logicalTableConfig.getTableName();
     if (StringUtils.isEmpty(tableName)) {
       throw new IllegalArgumentException("Invalid logical table name. Reason: 
'tableName' should not be null or empty");
@@ -100,12 +93,6 @@ public class LogicalTableConfigUtils {
                   + "' should have the same database name as logical table: " 
+ databaseName + " != "
                   + physicalTableDatabaseName);
         }
-
-        // validate physical table exists
-        if (!physicalTableExistsPredicate.test(physicalTableName)) {
-          throw new IllegalArgumentException(
-              "Invalid logical table. Reason: '" + physicalTableName + "' 
should be one of the existing tables");
-        }
       }
 
       if (TableNameBuilder.isOfflineTableResource(physicalTableName)) {
@@ -175,19 +162,6 @@ public class LogicalTableConfigUtils {
           "Invalid logical table. Reason: 'quota.storage' should not be set 
for logical table");
     }
 
-    // validate broker tenant exists
-    String brokerTenant = logicalTableConfig.getBrokerTenant();
-    if (!brokerTenantExistsPredicate.test(brokerTenant)) {
-      throw new IllegalArgumentException(
-          "Invalid logical table. Reason: '" + brokerTenant + "' should be one 
of the existing broker tenants");
-    }
-
-    // Validate schema with same name as logical table exists
-    if (!ZKMetadataProvider.isSchemaExists(propertyStore, tableName)) {
-      throw new IllegalArgumentException(
-          "Invalid logical table. Reason: Schema with same name as logical 
table '" + tableName + "' does not exist");
-    }
-
     // validate time boundary config is not null for hybrid tables
     TimeBoundaryConfig timeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
     if (logicalTableConfig.isHybridLogicalTable() && timeBoundaryConfig == 
null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigValidationUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigValidationUtils.java
index 1b3d1c0cfe7..72e93e6d048 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigValidationUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigValidationUtils.java
@@ -63,6 +63,8 @@ public final class TableConfigValidationUtils {
     checkHybridTableConfig(resourceManager, tableConfig);
     TaskConfigUtils.validateTaskConfigs(tableConfig, schema, taskManager, 
typesToSkip);
     validateInstanceAssignment(resourceManager, tableConfig);
+    resourceManager.validateTableTenantConfig(tableConfig);
+    resourceManager.validateTableTaskMinionInstanceTagConfig(tableConfig);
   }
 
   private static void checkHybridTableConfig(PinotHelixResourceManager 
resourceManager, TableConfig tableConfig) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7caa81d9d79..d0f204087b0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -174,6 +174,7 @@ import org.apache.pinot.spi.controller.ControllerJobType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -1901,20 +1902,12 @@ public class PinotHelixResourceManager {
     String tableName = logicalTableConfig.getTableName();
     LOGGER.info("Adding logical table {}: Start", tableName);
 
-    validateLogicalTableConfig(logicalTableConfig);
-
-    // Check if the logical table name is already used
-    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
-      throw new TableAlreadyExistsException("Logical table: " + tableName + " 
already exists");
+    if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+      logicalTableConfig.setBrokerTenant("DefaultTenant");
     }
 
-    // Check if the table name is already used by a physical table
-    PinotHelixPropertyStoreZnRecordProvider 
pinotHelixPropertyStoreZnRecordProvider =
-        PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore);
-    if 
(pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
-        || 
pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.REALTIME.tableNameWithType(tableName)))
 {
-      throw new TableAlreadyExistsException("Table name: " + tableName + " 
already exists");
-    }
+    validateNewLogicalTableConfig(logicalTableConfig);
+    validatePhysicalTablesExist(logicalTableConfig);
 
     LOGGER.info("Adding logical table {}: Creating logical table config in the 
property store", tableName);
     ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
@@ -1932,7 +1925,7 @@ public class PinotHelixResourceManager {
    * these parameters must be specified in the table config.
    */
   @VisibleForTesting
-  void validateTableTenantConfig(TableConfig tableConfig) {
+  public void validateTableTenantConfig(TableConfig tableConfig) {
     TenantConfig tenantConfig = tableConfig.getTenantConfig();
     String tableNameWithType = tableConfig.getTableName();
     String brokerTag = tenantConfig.getBroker();
@@ -2001,7 +1994,7 @@ public class PinotHelixResourceManager {
    * The validation will run only when the task is set to be scheduled (has 
the schedule config param set).
    */
   @VisibleForTesting
-  void validateTableTaskMinionInstanceTagConfig(TableConfig tableConfig) {
+  public void validateTableTaskMinionInstanceTagConfig(TableConfig 
tableConfig) {
 
     List<InstanceConfig> allMinionWorkerInstanceConfigs = 
getAllMinionInstanceConfigs();
 
@@ -2161,7 +2154,8 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Sets the given table config into zookeeper
+   * Sets the given table config into zookeeper bypassing validations in 
updateTableConfig
+   * TODO - Make this private and always use updateTableConfig ?
    */
   public void setExistingTableConfig(TableConfig tableConfig)
       throws IOException {
@@ -2178,7 +2172,12 @@ public class PinotHelixResourceManager {
     String tableName = logicalTableConfig.getTableName();
     LOGGER.info("Updating logical table {}: Start", tableName);
 
+    if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+      logicalTableConfig.setBrokerTenant("DefaultTenant");
+    }
+
     validateLogicalTableConfig(logicalTableConfig);
+    validatePhysicalTablesExist(logicalTableConfig);
 
     LogicalTableConfig oldLogicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
     if (oldLogicalTableConfig == null) {
@@ -2198,6 +2197,21 @@ public class PinotHelixResourceManager {
     LOGGER.info("Updated logical table {}: Successfully updated table", 
tableName);
   }
 
+  public void validatePhysicalTablesExist(LogicalTableConfig 
logicalTableConfig) {
+    for (Map.Entry<String, PhysicalTableConfig> entry : 
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
+      PhysicalTableConfig physicalTableConfig = entry.getValue();
+      String physicalTableName = entry.getKey();
+      // Skip existence validation for multi-cluster physical tables
+      if (!physicalTableConfig.isMultiCluster()) {
+        // validate physical table exists
+        if 
(!PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore).exist(physicalTableName))
 {
+          throw new IllegalArgumentException(
+              "Invalid logical table. Reason: '" + physicalTableName + "' 
should be one of the existing tables");
+        }
+      }
+    }
+  }
+
   private void updateBrokerResourceForLogicalTable(LogicalTableConfig 
logicalTableConfig, String tableName) {
     List<String> brokers = HelixHelper.getInstancesWithTag(
         _helixZkManager, 
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant()));
@@ -2209,17 +2223,37 @@ public class PinotHelixResourceManager {
     });
   }
 
-  private void validateLogicalTableConfig(LogicalTableConfig 
logicalTableConfig) {
-    if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
-      logicalTableConfig.setBrokerTenant("DefaultTenant");
+  public void validateNewLogicalTableConfig(LogicalTableConfig 
logicalTableConfig) {
+    String tableName = logicalTableConfig.getTableName();
+    validateLogicalTableConfig(logicalTableConfig);
+    // Check if the logical table name is already used
+    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
+      throw new TableAlreadyExistsException("Logical table: " + tableName + " 
already exists");
     }
 
-    LogicalTableConfigUtils.validateLogicalTableConfig(
-        logicalTableConfig,
-        
PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist,
-        getAllBrokerTenantNames()::contains,
-        _propertyStore
-    );
+    // Check if the table name is already used by a physical table
+    PinotHelixPropertyStoreZnRecordProvider 
pinotHelixPropertyStoreZnRecordProvider =
+        PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore);
+    if 
(pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
+        || 
pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.REALTIME.tableNameWithType(tableName)))
 {
+      throw new TableAlreadyExistsException("Table name: " + tableName + " 
already exists");
+    }
+  }
+
+  public void validateLogicalTableConfig(LogicalTableConfig 
logicalTableConfig) {
+    LogicalTableConfigUtils.validateLogicalTableConfig(logicalTableConfig);
+    // validate broker tenant exists
+    String brokerTenant = logicalTableConfig.getBrokerTenant();
+    if (!getAllBrokerTenantNames().contains(brokerTenant)) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: '" + brokerTenant + "' should be one 
of the existing broker tenants");
+    }
+    // Validate schema with same name as logical table exists
+    if (!ZKMetadataProvider.isSchemaExists(_propertyStore, 
logicalTableConfig.getTableName())) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: Schema with same name as logical 
table '"
+              + logicalTableConfig.getTableName() + "' does not exist");
+    }
   }
 
   /**
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
index f65c4a98b34..5d39becb763 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -335,7 +335,8 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
   }
 
   @Test
-  public void testLogicalTablePhysicalTableConfigValidation() {
+  public void testLogicalTablePhysicalTableConfigValidation()
+      throws IOException {
     // Test empty physical table names is not allowed
     Throwable throwable = expectThrows(IOException.class, () -> {
       LogicalTableConfig tableConfig = 
getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(), BROKER_TENANT);
@@ -345,6 +346,7 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
         throwable.getMessage());
 
     // Test all table names are physical table names and none is hybrid table 
name
+    addDummySchema(LOGICAL_TABLE_NAME);
     throwable = expectThrows(IOException.class, () -> {
       List<String> physicalTableNames = List.of("test_table_1");
       LogicalTableConfig logicalTableConfig =
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index d82f32ce208..57e2f9f27ca 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -889,6 +889,17 @@ public final class Schema implements Serializable {
     return cloned;
   }
 
+  public static Schema cloneSchemaWithName(Schema source, String newName) {
+    try {
+      String json = JsonUtils.objectToString(source);
+      Schema cloned = JsonUtils.stringToObject(json, Schema.class);
+      cloned.setSchemaName(newName);
+      return cloned;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to clone schema", e);
+    }
+  }
+
   /**
    * Helper method that converts a {@link TimeFieldSpec} to {@link 
DateTimeFieldSpec}
    * 1) If timeFieldSpec contains only incoming granularity spec, directly 
convert it to a dateTimeFieldSpec


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to