This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c52aa386567 Make LogicalTableConfig serialiser / deserialiser 
pluggable (#17678)
c52aa386567 is described below

commit c52aa386567508fd46c0dd4965b5414fca086c8f
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed Feb 25 01:39:50 2026 +0530

    Make LogicalTableConfig serialiser / deserialiser pluggable (#17678)
---
 .../utils/DefaultLogicalTableConfigSerDe.java      | 128 +++++++++++++++
 .../common/utils/LogicalTableConfigSerDe.java      |  54 +++++++
 .../utils/LogicalTableConfigSerDeProvider.java     |  75 +++++++++
 .../common/utils/LogicalTableConfigUtils.java      |  72 +--------
 .../utils/DefaultLogicalTableConfigSerDeTest.java  | 175 +++++++++++++++++++++
 5 files changed, 435 insertions(+), 69 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDe.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDe.java
new file mode 100644
index 00000000000..7f95e032ce1
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDe.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@AutoService(LogicalTableConfigSerDe.class)
+public class DefaultLogicalTableConfigSerDe implements LogicalTableConfigSerDe 
{
+
+  @Override
+  public LogicalTableConfig fromZNRecord(ZNRecord znRecord)
+      throws IOException {
+    LogicalTableConfig config = createConfig();
+    populateFromZNRecord(config, znRecord);
+    return config;
+  }
+
+  protected LogicalTableConfig createConfig() {
+    return new LogicalTableConfig();
+  }
+
+  protected void populateFromZNRecord(LogicalTableConfig config, ZNRecord 
znRecord)
+      throws IOException {
+    
config.setTableName(znRecord.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY));
+    
config.setBrokerTenant(znRecord.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY));
+
+    String queryConfigJson = 
znRecord.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY);
+    if (queryConfigJson != null) {
+      config.setQueryConfig(JsonUtils.stringToObject(queryConfigJson, 
QueryConfig.class));
+    }
+    String quotaConfigJson = 
znRecord.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY);
+    if (quotaConfigJson != null) {
+      config.setQuotaConfig(JsonUtils.stringToObject(quotaConfigJson, 
QuotaConfig.class));
+    }
+    String refOfflineTableName = 
znRecord.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY);
+    if (refOfflineTableName != null) {
+      config.setRefOfflineTableName(refOfflineTableName);
+    }
+    String refRealtimeTableName = 
znRecord.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY);
+    if (refRealtimeTableName != null) {
+      config.setRefRealtimeTableName(refRealtimeTableName);
+    }
+    String timeBoundaryConfigJson = 
znRecord.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY);
+    if (timeBoundaryConfigJson != null) {
+      
config.setTimeBoundaryConfig(JsonUtils.stringToObject(timeBoundaryConfigJson, 
TimeBoundaryConfig.class));
+    }
+
+    populatePhysicalTableConfigs(config, znRecord);
+  }
+
+  protected void populatePhysicalTableConfigs(LogicalTableConfig config, 
ZNRecord znRecord)
+      throws IOException {
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    Map<String, String> physicalTableMapField = 
znRecord.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY);
+    if (physicalTableMapField != null) {
+      for (Map.Entry<String, String> entry : physicalTableMapField.entrySet()) 
{
+        physicalTableConfigMap.put(entry.getKey(),
+            JsonUtils.stringToObject(entry.getValue(), 
PhysicalTableConfig.class));
+      }
+    }
+    config.setPhysicalTableConfigMap(physicalTableConfigMap);
+  }
+
+  @Override
+  public ZNRecord toZNRecord(LogicalTableConfig config)
+      throws JsonProcessingException {
+    ZNRecord znRecord = new ZNRecord(config.getTableName());
+    writeToZNRecord(config, znRecord);
+    return znRecord;
+  }
+
+  protected void writeToZNRecord(LogicalTableConfig config, ZNRecord znRecord)
+      throws JsonProcessingException {
+    znRecord.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY, 
config.getTableName());
+    znRecord.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY, 
config.getBrokerTenant());
+
+    if (config.getQueryConfig() != null) {
+      znRecord.setSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY, 
config.getQueryConfig().toJsonString());
+    }
+    if (config.getQuotaConfig() != null) {
+      znRecord.setSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY, 
config.getQuotaConfig().toJsonString());
+    }
+    if (config.getRefOfflineTableName() != null) {
+      znRecord.setSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY, 
config.getRefOfflineTableName());
+    }
+    if (config.getRefRealtimeTableName() != null) {
+      znRecord.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY, 
config.getRefRealtimeTableName());
+    }
+    if (config.getTimeBoundaryConfig() != null) {
+      znRecord.setSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY,
+          config.getTimeBoundaryConfig().toJsonString());
+    }
+
+    Map<String, String> physicalTableConfigMapField = new HashMap<>();
+    for (Map.Entry<String, PhysicalTableConfig> entry : 
config.getPhysicalTableConfigMap().entrySet()) {
+      physicalTableConfigMapField.put(entry.getKey(), 
entry.getValue().toJsonString());
+    }
+    znRecord.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY, 
physicalTableConfigMapField);
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDe.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDe.java
new file mode 100644
index 00000000000..f83d8983b12
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDe.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+
+
+/**
+ * Pluggable serializer/deserializer for {@link LogicalTableConfig} stored in 
ZooKeeper as {@link ZNRecord}.
+ *
+ * <p>Implementations are discovered via {@link java.util.ServiceLoader}. When 
multiple implementations
+ * are present, the one with the highest {@link #getPriority()} wins.</p>
+ */
+public interface LogicalTableConfigSerDe {
+
+  /**
+   * Deserializes a {@link LogicalTableConfig} from the given {@link ZNRecord}.
+   */
+  LogicalTableConfig fromZNRecord(ZNRecord znRecord)
+      throws IOException;
+
+  /**
+   * Serializes a {@link LogicalTableConfig} into a {@link ZNRecord}.
+   */
+  ZNRecord toZNRecord(LogicalTableConfig logicalTableConfig)
+      throws JsonProcessingException;
+
+  /**
+   * Returns the priority of this implementation. Higher values win over lower 
values
+   * when multiple implementations are discovered via ServiceLoader.
+   */
+  default int getPriority() {
+    return 0;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDeProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDeProvider.java
new file mode 100644
index 00000000000..b472f5303ea
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDeProvider.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ServiceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Singleton holder for {@link LogicalTableConfigSerDe}, loaded via {@link 
ServiceLoader}.
+ *
+ * <p>When multiple implementations are discovered, the one with the highest
+ * {@link LogicalTableConfigSerDe#getPriority()} is selected</p>
+ */
+public class LogicalTableConfigSerDeProvider {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LogicalTableConfigSerDeProvider.class);
+
+  private static volatile LogicalTableConfigSerDe _instance = 
fromServiceLoader();
+
+  private LogicalTableConfigSerDeProvider() {
+  }
+
+  public static LogicalTableConfigSerDe getInstance() {
+    return _instance;
+  }
+
+  /**
+   * Overrides the singleton instance. Useful for testing.
+   */
+  @VisibleForTesting
+  public static void setInstance(LogicalTableConfigSerDe serDe) {
+    _instance = serDe;
+  }
+
+  private static LogicalTableConfigSerDe fromServiceLoader() {
+    LogicalTableConfigSerDe best = null;
+    int bestPriority = Integer.MIN_VALUE;
+
+    for (LogicalTableConfigSerDe serDe : 
ServiceLoader.load(LogicalTableConfigSerDe.class)) {
+      LOGGER.info("Discovered LogicalTableConfigSerDe: {} with priority {}",
+          serDe.getClass().getName(), serDe.getPriority());
+      if (serDe.getPriority() > bestPriority) {
+        best = serDe;
+        bestPriority = serDe.getPriority();
+      }
+    }
+
+    if (best == null) {
+      throw new RuntimeException("No implementation of LogicalTableConfigSerDe 
found");
+    }
+
+    LOGGER.info("Selected LogicalTableConfigSerDe: {} with priority {}",
+        best.getClass().getName(), bestPriority);
+
+    return best;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index 99a593845e5..edf6b038aff 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -20,7 +20,6 @@ package org.apache.pinot.common.utils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -31,14 +30,11 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.TimeBoundaryConfig;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -48,76 +44,14 @@ public class LogicalTableConfigUtils {
     // Utility class
   }
 
-  public static LogicalTableConfig fromZNRecord(ZNRecord record)
+  public static LogicalTableConfig fromZNRecord(ZNRecord znRecord)
       throws IOException {
-    LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
-        
.setTableName(record.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY))
-        
.setBrokerTenant(record.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY));
-
-    if (record.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY) != null) {
-      
builder.setQueryConfig(JsonUtils.stringToObject(record.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY),
-          QueryConfig.class));
-    }
-    if (record.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY) != null) {
-      
builder.setQuotaConfig(JsonUtils.stringToObject(record.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY),
-          QuotaConfig.class));
-    }
-    if (record.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY) 
!= null) {
-      
builder.setRefOfflineTableName(record.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY));
-    }
-    if (record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY) 
!= null) {
-      
builder.setRefRealtimeTableName(record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY));
-    }
-    String timeBoundaryConfigJson = 
record.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY);
-    if (timeBoundaryConfigJson != null) {
-      
builder.setTimeBoundaryConfig(JsonUtils.stringToObject(timeBoundaryConfigJson, 
TimeBoundaryConfig.class));
-    }
-
-    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
-    for (Map.Entry<String, String> entry : 
record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY)
-        .entrySet()) {
-      String physicalTableName = entry.getKey();
-      String physicalTableConfigJson = entry.getValue();
-      physicalTableConfigMap.put(physicalTableName,
-          JsonUtils.stringToObject(physicalTableConfigJson, 
PhysicalTableConfig.class));
-    }
-    builder.setPhysicalTableConfigMap(physicalTableConfigMap);
-    return builder.build();
+    return 
LogicalTableConfigSerDeProvider.getInstance().fromZNRecord(znRecord);
   }
 
   public static ZNRecord toZNRecord(LogicalTableConfig logicalTableConfig)
       throws JsonProcessingException {
-    Map<String, String> physicalTableConfigMap = new HashMap<>();
-    for (Map.Entry<String, PhysicalTableConfig> entry : 
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
-      String physicalTableName = entry.getKey();
-      PhysicalTableConfig physicalTableConfig = entry.getValue();
-      physicalTableConfigMap.put(physicalTableName, 
physicalTableConfig.toJsonString());
-    }
-
-    ZNRecord record = new ZNRecord(logicalTableConfig.getTableName());
-    record.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY, 
logicalTableConfig.getTableName());
-    record.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY, 
logicalTableConfig.getBrokerTenant());
-    record.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY, 
physicalTableConfigMap);
-
-    if (logicalTableConfig.getQueryConfig() != null) {
-      record.setSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY, 
logicalTableConfig.getQueryConfig().toJsonString());
-    }
-    if (logicalTableConfig.getQuotaConfig() != null) {
-      record.setSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY, 
logicalTableConfig.getQuotaConfig().toJsonString());
-    }
-    if (logicalTableConfig.getRefOfflineTableName() != null) {
-      record.setSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY,
-          logicalTableConfig.getRefOfflineTableName());
-    }
-    if (logicalTableConfig.getRefRealtimeTableName() != null) {
-      record.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY,
-          logicalTableConfig.getRefRealtimeTableName());
-    }
-    if (logicalTableConfig.getTimeBoundaryConfig() != null) {
-      record.setSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY,
-          logicalTableConfig.getTimeBoundaryConfig().toJsonString());
-    }
-    return record;
+    return 
LogicalTableConfigSerDeProvider.getInstance().toZNRecord(logicalTableConfig);
   }
 
   public static void validateLogicalTableConfig(
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDeTest.java
new file mode 100644
index 00000000000..0dbc917d83f
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDeTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests for {@link LogicalTableConfig} JSON serialization and
+ * {@link DefaultLogicalTableConfigSerDe} ZNRecord round-trip serialization.
+ */
+public class DefaultLogicalTableConfigSerDeTest {
+
+  private final DefaultLogicalTableConfigSerDe _serDe = new 
DefaultLogicalTableConfigSerDe();
+
+  @Test
+  public void testMinimalConfig()
+      throws Exception {
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    physicalTableConfigMap.put("table_OFFLINE", new PhysicalTableConfig());
+
+    LogicalTableConfig config = new LogicalTableConfigBuilder()
+        .setTableName("logicalTable")
+        .setBrokerTenant("defaultTenant")
+        .setPhysicalTableConfigMap(physicalTableConfigMap)
+        .build();
+
+    checkMinimalConfig(config);
+    checkMinimalConfig(jsonRoundTrip(config));
+    checkMinimalConfig(znRecordRoundTrip(config));
+  }
+
+  private void checkMinimalConfig(LogicalTableConfig config) {
+    assertEquals(config.getTableName(), "logicalTable");
+    assertEquals(config.getBrokerTenant(), "defaultTenant");
+    assertNotNull(config.getPhysicalTableConfigMap());
+    assertEquals(config.getPhysicalTableConfigMap().size(), 1);
+    
assertTrue(config.getPhysicalTableConfigMap().containsKey("table_OFFLINE"));
+    assertNull(config.getQueryConfig());
+    assertNull(config.getQuotaConfig());
+    assertNull(config.getRefOfflineTableName());
+    assertNull(config.getRefRealtimeTableName());
+    assertNull(config.getTimeBoundaryConfig());
+    assertFalse(config.isHybridLogicalTable());
+  }
+
+  @Test
+  public void testFullConfig()
+      throws Exception {
+    QuotaConfig quotaConfig = new QuotaConfig(null, "200.00");
+    QueryConfig queryConfig =
+        new QueryConfig(3000L, false, true, 
Collections.singletonMap("func(a)", "b"), null, null);
+    Map<String, Object> params = new HashMap<>();
+    params.put("key", "value");
+    TimeBoundaryConfig timeBoundaryConfig = new TimeBoundaryConfig("MIN", 
params);
+
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    physicalTableConfigMap.put("table_OFFLINE", new PhysicalTableConfig());
+    physicalTableConfigMap.put("table_REALTIME", new PhysicalTableConfig());
+    physicalTableConfigMap.put("remote_table_OFFLINE", new 
PhysicalTableConfig(true));
+
+    LogicalTableConfig config = new LogicalTableConfigBuilder()
+        .setTableName("myLogicalTable")
+        .setBrokerTenant("myTenant")
+        .setPhysicalTableConfigMap(physicalTableConfigMap)
+        .setQuotaConfig(quotaConfig)
+        .setQueryConfig(queryConfig)
+        .setRefOfflineTableName("table_OFFLINE")
+        .setRefRealtimeTableName("table_REALTIME")
+        .setTimeBoundaryConfig(timeBoundaryConfig)
+        .build();
+
+    checkFullConfig(config);
+    checkFullConfig(jsonRoundTrip(config));
+    checkFullConfig(znRecordRoundTrip(config));
+  }
+
+  private void checkFullConfig(LogicalTableConfig config) {
+    assertEquals(config.getTableName(), "myLogicalTable");
+    assertEquals(config.getBrokerTenant(), "myTenant");
+    assertTrue(config.isHybridLogicalTable());
+
+    assertEquals(config.getPhysicalTableConfigMap().size(), 3);
+    
assertFalse(config.getPhysicalTableConfigMap().get("table_OFFLINE").isMultiCluster());
+    
assertFalse(config.getPhysicalTableConfigMap().get("table_REALTIME").isMultiCluster());
+    
assertTrue(config.getPhysicalTableConfigMap().get("remote_table_OFFLINE").isMultiCluster());
+
+    assertNotNull(config.getQuotaConfig());
+    assertEquals(config.getQuotaConfig().getMaxQueriesPerSecond(), "200.0");
+
+    QueryConfig queryConfig = config.getQueryConfig();
+    assertNotNull(queryConfig);
+    assertEquals(queryConfig.getTimeoutMs(), Long.valueOf(3000L));
+    assertEquals(queryConfig.getDisableGroovy(), Boolean.FALSE);
+    assertEquals(queryConfig.getExpressionOverrideMap(), 
Collections.singletonMap("func(a)", "b"));
+
+    assertEquals(config.getRefOfflineTableName(), "table_OFFLINE");
+    assertEquals(config.getRefRealtimeTableName(), "table_REALTIME");
+
+    TimeBoundaryConfig tbc = config.getTimeBoundaryConfig();
+    assertNotNull(tbc);
+    assertEquals(tbc.getBoundaryStrategy(), "MIN");
+    assertEquals(tbc.getParameters().get("key"), "value");
+  }
+
+  @Test
+  public void testZNRecordFieldMapping()
+      throws Exception {
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    physicalTableConfigMap.put("table_OFFLINE", new PhysicalTableConfig());
+
+    LogicalTableConfig config = new LogicalTableConfigBuilder()
+        .setTableName("logicalTable")
+        .setBrokerTenant("testTenant")
+        .setPhysicalTableConfigMap(physicalTableConfigMap)
+        .setRefOfflineTableName("table_OFFLINE")
+        .build();
+
+    ZNRecord znRecord = _serDe.toZNRecord(config);
+
+    assertEquals(znRecord.getId(), "logicalTable");
+    
assertEquals(znRecord.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY),
 "logicalTable");
+    
assertEquals(znRecord.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY), 
"testTenant");
+    
assertEquals(znRecord.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY),
 "table_OFFLINE");
+    
assertNull(znRecord.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY));
+    assertNull(znRecord.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY));
+    assertNull(znRecord.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY));
+    
assertNull(znRecord.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY));
+
+    Map<String, String> mapField = 
znRecord.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY);
+    assertNotNull(mapField);
+    assertEquals(mapField.size(), 1);
+    assertTrue(mapField.containsKey("table_OFFLINE"));
+  }
+
+  private LogicalTableConfig jsonRoundTrip(LogicalTableConfig config)
+      throws Exception {
+    return JsonUtils.stringToObject(config.toSingleLineJsonString(), 
LogicalTableConfig.class);
+  }
+
+  private LogicalTableConfig znRecordRoundTrip(LogicalTableConfig config)
+      throws Exception {
+    ZNRecord znRecord = _serDe.toZNRecord(config);
+    return _serDe.fromZNRecord(znRecord);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to