This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c52aa386567 Make LogicalTableConfig serialiser / deserialiser
pluggable (#17678)
c52aa386567 is described below
commit c52aa386567508fd46c0dd4965b5414fca086c8f
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed Feb 25 01:39:50 2026 +0530
Make LogicalTableConfig serialiser / deserialiser pluggable (#17678)
---
.../utils/DefaultLogicalTableConfigSerDe.java | 128 +++++++++++++++
.../common/utils/LogicalTableConfigSerDe.java | 54 +++++++
.../utils/LogicalTableConfigSerDeProvider.java | 75 +++++++++
.../common/utils/LogicalTableConfigUtils.java | 72 +--------
.../utils/DefaultLogicalTableConfigSerDeTest.java | 175 +++++++++++++++++++++
5 files changed, 435 insertions(+), 69 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDe.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDe.java
new file mode 100644
index 00000000000..7f95e032ce1
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDe.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@AutoService(LogicalTableConfigSerDe.class)
+public class DefaultLogicalTableConfigSerDe implements LogicalTableConfigSerDe
{
+
+ @Override
+ public LogicalTableConfig fromZNRecord(ZNRecord znRecord)
+ throws IOException {
+ LogicalTableConfig config = createConfig();
+ populateFromZNRecord(config, znRecord);
+ return config;
+ }
+
+ protected LogicalTableConfig createConfig() {
+ return new LogicalTableConfig();
+ }
+
+ protected void populateFromZNRecord(LogicalTableConfig config, ZNRecord
znRecord)
+ throws IOException {
+
config.setTableName(znRecord.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY));
+
config.setBrokerTenant(znRecord.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY));
+
+ String queryConfigJson =
znRecord.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY);
+ if (queryConfigJson != null) {
+ config.setQueryConfig(JsonUtils.stringToObject(queryConfigJson,
QueryConfig.class));
+ }
+ String quotaConfigJson =
znRecord.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY);
+ if (quotaConfigJson != null) {
+ config.setQuotaConfig(JsonUtils.stringToObject(quotaConfigJson,
QuotaConfig.class));
+ }
+ String refOfflineTableName =
znRecord.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY);
+ if (refOfflineTableName != null) {
+ config.setRefOfflineTableName(refOfflineTableName);
+ }
+ String refRealtimeTableName =
znRecord.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY);
+ if (refRealtimeTableName != null) {
+ config.setRefRealtimeTableName(refRealtimeTableName);
+ }
+ String timeBoundaryConfigJson =
znRecord.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY);
+ if (timeBoundaryConfigJson != null) {
+
config.setTimeBoundaryConfig(JsonUtils.stringToObject(timeBoundaryConfigJson,
TimeBoundaryConfig.class));
+ }
+
+ populatePhysicalTableConfigs(config, znRecord);
+ }
+
+ protected void populatePhysicalTableConfigs(LogicalTableConfig config,
ZNRecord znRecord)
+ throws IOException {
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ Map<String, String> physicalTableMapField =
znRecord.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY);
+ if (physicalTableMapField != null) {
+ for (Map.Entry<String, String> entry : physicalTableMapField.entrySet())
{
+ physicalTableConfigMap.put(entry.getKey(),
+ JsonUtils.stringToObject(entry.getValue(),
PhysicalTableConfig.class));
+ }
+ }
+ config.setPhysicalTableConfigMap(physicalTableConfigMap);
+ }
+
+ @Override
+ public ZNRecord toZNRecord(LogicalTableConfig config)
+ throws JsonProcessingException {
+ ZNRecord znRecord = new ZNRecord(config.getTableName());
+ writeToZNRecord(config, znRecord);
+ return znRecord;
+ }
+
+ protected void writeToZNRecord(LogicalTableConfig config, ZNRecord znRecord)
+ throws JsonProcessingException {
+ znRecord.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY,
config.getTableName());
+ znRecord.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY,
config.getBrokerTenant());
+
+ if (config.getQueryConfig() != null) {
+ znRecord.setSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY,
config.getQueryConfig().toJsonString());
+ }
+ if (config.getQuotaConfig() != null) {
+ znRecord.setSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY,
config.getQuotaConfig().toJsonString());
+ }
+ if (config.getRefOfflineTableName() != null) {
+ znRecord.setSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY,
config.getRefOfflineTableName());
+ }
+ if (config.getRefRealtimeTableName() != null) {
+ znRecord.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY,
config.getRefRealtimeTableName());
+ }
+ if (config.getTimeBoundaryConfig() != null) {
+ znRecord.setSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY,
+ config.getTimeBoundaryConfig().toJsonString());
+ }
+
+ Map<String, String> physicalTableConfigMapField = new HashMap<>();
+ for (Map.Entry<String, PhysicalTableConfig> entry :
config.getPhysicalTableConfigMap().entrySet()) {
+ physicalTableConfigMapField.put(entry.getKey(),
entry.getValue().toJsonString());
+ }
+ znRecord.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY,
physicalTableConfigMapField);
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDe.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDe.java
new file mode 100644
index 00000000000..f83d8983b12
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDe.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+
+
+/**
+ * Pluggable serializer/deserializer for {@link LogicalTableConfig} stored in
ZooKeeper as {@link ZNRecord}.
+ *
+ * <p>Implementations are discovered via {@link java.util.ServiceLoader}. When
multiple implementations
+ * are present, the one with the highest {@link #getPriority()} wins.</p>
+ */
+public interface LogicalTableConfigSerDe {
+
+ /**
+ * Deserializes a {@link LogicalTableConfig} from the given {@link ZNRecord}.
+ */
+ LogicalTableConfig fromZNRecord(ZNRecord znRecord)
+ throws IOException;
+
+ /**
+ * Serializes a {@link LogicalTableConfig} into a {@link ZNRecord}.
+ */
+ ZNRecord toZNRecord(LogicalTableConfig logicalTableConfig)
+ throws JsonProcessingException;
+
+ /**
+ * Returns the priority of this implementation. Higher values win over lower
values
+ * when multiple implementations are discovered via ServiceLoader.
+ */
+ default int getPriority() {
+ return 0;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDeProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDeProvider.java
new file mode 100644
index 00000000000..b472f5303ea
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigSerDeProvider.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ServiceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Singleton holder for {@link LogicalTableConfigSerDe}, loaded via {@link
ServiceLoader}.
+ *
+ * <p>When multiple implementations are discovered, the one with the highest
+ * {@link LogicalTableConfigSerDe#getPriority()} is selected</p>
+ */
+public class LogicalTableConfigSerDeProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LogicalTableConfigSerDeProvider.class);
+
+ private static volatile LogicalTableConfigSerDe _instance =
fromServiceLoader();
+
+ private LogicalTableConfigSerDeProvider() {
+ }
+
+ public static LogicalTableConfigSerDe getInstance() {
+ return _instance;
+ }
+
+ /**
+ * Overrides the singleton instance. Useful for testing.
+ */
+ @VisibleForTesting
+ public static void setInstance(LogicalTableConfigSerDe serDe) {
+ _instance = serDe;
+ }
+
+ private static LogicalTableConfigSerDe fromServiceLoader() {
+ LogicalTableConfigSerDe best = null;
+ int bestPriority = Integer.MIN_VALUE;
+
+ for (LogicalTableConfigSerDe serDe :
ServiceLoader.load(LogicalTableConfigSerDe.class)) {
+ LOGGER.info("Discovered LogicalTableConfigSerDe: {} with priority {}",
+ serDe.getClass().getName(), serDe.getPriority());
+ if (serDe.getPriority() > bestPriority) {
+ best = serDe;
+ bestPriority = serDe.getPriority();
+ }
+ }
+
+ if (best == null) {
+ throw new RuntimeException("No implementation of LogicalTableConfigSerDe
found");
+ }
+
+ LOGGER.info("Selected LogicalTableConfigSerDe: {} with priority {}",
+ best.getClass().getName(), bestPriority);
+
+ return best;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index 99a593845e5..edf6b038aff 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -20,7 +20,6 @@ package org.apache.pinot.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -31,14 +30,11 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.TimeBoundaryConfig;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -48,76 +44,14 @@ public class LogicalTableConfigUtils {
// Utility class
}
- public static LogicalTableConfig fromZNRecord(ZNRecord record)
+ public static LogicalTableConfig fromZNRecord(ZNRecord znRecord)
throws IOException {
- LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
-
.setTableName(record.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY))
-
.setBrokerTenant(record.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY));
-
- if (record.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY) != null) {
-
builder.setQueryConfig(JsonUtils.stringToObject(record.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY),
- QueryConfig.class));
- }
- if (record.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY) != null) {
-
builder.setQuotaConfig(JsonUtils.stringToObject(record.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY),
- QuotaConfig.class));
- }
- if (record.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY)
!= null) {
-
builder.setRefOfflineTableName(record.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY));
- }
- if (record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY)
!= null) {
-
builder.setRefRealtimeTableName(record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY));
- }
- String timeBoundaryConfigJson =
record.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY);
- if (timeBoundaryConfigJson != null) {
-
builder.setTimeBoundaryConfig(JsonUtils.stringToObject(timeBoundaryConfigJson,
TimeBoundaryConfig.class));
- }
-
- Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
- for (Map.Entry<String, String> entry :
record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY)
- .entrySet()) {
- String physicalTableName = entry.getKey();
- String physicalTableConfigJson = entry.getValue();
- physicalTableConfigMap.put(physicalTableName,
- JsonUtils.stringToObject(physicalTableConfigJson,
PhysicalTableConfig.class));
- }
- builder.setPhysicalTableConfigMap(physicalTableConfigMap);
- return builder.build();
+ return
LogicalTableConfigSerDeProvider.getInstance().fromZNRecord(znRecord);
}
public static ZNRecord toZNRecord(LogicalTableConfig logicalTableConfig)
throws JsonProcessingException {
- Map<String, String> physicalTableConfigMap = new HashMap<>();
- for (Map.Entry<String, PhysicalTableConfig> entry :
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
- String physicalTableName = entry.getKey();
- PhysicalTableConfig physicalTableConfig = entry.getValue();
- physicalTableConfigMap.put(physicalTableName,
physicalTableConfig.toJsonString());
- }
-
- ZNRecord record = new ZNRecord(logicalTableConfig.getTableName());
- record.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY,
logicalTableConfig.getTableName());
- record.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY,
logicalTableConfig.getBrokerTenant());
- record.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY,
physicalTableConfigMap);
-
- if (logicalTableConfig.getQueryConfig() != null) {
- record.setSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY,
logicalTableConfig.getQueryConfig().toJsonString());
- }
- if (logicalTableConfig.getQuotaConfig() != null) {
- record.setSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY,
logicalTableConfig.getQuotaConfig().toJsonString());
- }
- if (logicalTableConfig.getRefOfflineTableName() != null) {
- record.setSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY,
- logicalTableConfig.getRefOfflineTableName());
- }
- if (logicalTableConfig.getRefRealtimeTableName() != null) {
- record.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY,
- logicalTableConfig.getRefRealtimeTableName());
- }
- if (logicalTableConfig.getTimeBoundaryConfig() != null) {
- record.setSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY,
- logicalTableConfig.getTimeBoundaryConfig().toJsonString());
- }
- return record;
+ return
LogicalTableConfigSerDeProvider.getInstance().toZNRecord(logicalTableConfig);
}
public static void validateLogicalTableConfig(
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDeTest.java
new file mode 100644
index 00000000000..0dbc917d83f
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DefaultLogicalTableConfigSerDeTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests for {@link LogicalTableConfig} JSON serialization and
+ * {@link DefaultLogicalTableConfigSerDe} ZNRecord round-trip serialization.
+ */
+public class DefaultLogicalTableConfigSerDeTest {
+
+ private final DefaultLogicalTableConfigSerDe _serDe = new
DefaultLogicalTableConfigSerDe();
+
+ @Test
+ public void testMinimalConfig()
+ throws Exception {
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ physicalTableConfigMap.put("table_OFFLINE", new PhysicalTableConfig());
+
+ LogicalTableConfig config = new LogicalTableConfigBuilder()
+ .setTableName("logicalTable")
+ .setBrokerTenant("defaultTenant")
+ .setPhysicalTableConfigMap(physicalTableConfigMap)
+ .build();
+
+ checkMinimalConfig(config);
+ checkMinimalConfig(jsonRoundTrip(config));
+ checkMinimalConfig(znRecordRoundTrip(config));
+ }
+
+ private void checkMinimalConfig(LogicalTableConfig config) {
+ assertEquals(config.getTableName(), "logicalTable");
+ assertEquals(config.getBrokerTenant(), "defaultTenant");
+ assertNotNull(config.getPhysicalTableConfigMap());
+ assertEquals(config.getPhysicalTableConfigMap().size(), 1);
+
assertTrue(config.getPhysicalTableConfigMap().containsKey("table_OFFLINE"));
+ assertNull(config.getQueryConfig());
+ assertNull(config.getQuotaConfig());
+ assertNull(config.getRefOfflineTableName());
+ assertNull(config.getRefRealtimeTableName());
+ assertNull(config.getTimeBoundaryConfig());
+ assertFalse(config.isHybridLogicalTable());
+ }
+
+ @Test
+ public void testFullConfig()
+ throws Exception {
+ QuotaConfig quotaConfig = new QuotaConfig(null, "200.00");
+ QueryConfig queryConfig =
+ new QueryConfig(3000L, false, true,
Collections.singletonMap("func(a)", "b"), null, null);
+ Map<String, Object> params = new HashMap<>();
+ params.put("key", "value");
+ TimeBoundaryConfig timeBoundaryConfig = new TimeBoundaryConfig("MIN",
params);
+
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ physicalTableConfigMap.put("table_OFFLINE", new PhysicalTableConfig());
+ physicalTableConfigMap.put("table_REALTIME", new PhysicalTableConfig());
+ physicalTableConfigMap.put("remote_table_OFFLINE", new
PhysicalTableConfig(true));
+
+ LogicalTableConfig config = new LogicalTableConfigBuilder()
+ .setTableName("myLogicalTable")
+ .setBrokerTenant("myTenant")
+ .setPhysicalTableConfigMap(physicalTableConfigMap)
+ .setQuotaConfig(quotaConfig)
+ .setQueryConfig(queryConfig)
+ .setRefOfflineTableName("table_OFFLINE")
+ .setRefRealtimeTableName("table_REALTIME")
+ .setTimeBoundaryConfig(timeBoundaryConfig)
+ .build();
+
+ checkFullConfig(config);
+ checkFullConfig(jsonRoundTrip(config));
+ checkFullConfig(znRecordRoundTrip(config));
+ }
+
+ private void checkFullConfig(LogicalTableConfig config) {
+ assertEquals(config.getTableName(), "myLogicalTable");
+ assertEquals(config.getBrokerTenant(), "myTenant");
+ assertTrue(config.isHybridLogicalTable());
+
+ assertEquals(config.getPhysicalTableConfigMap().size(), 3);
+
assertFalse(config.getPhysicalTableConfigMap().get("table_OFFLINE").isMultiCluster());
+
assertFalse(config.getPhysicalTableConfigMap().get("table_REALTIME").isMultiCluster());
+
assertTrue(config.getPhysicalTableConfigMap().get("remote_table_OFFLINE").isMultiCluster());
+
+ assertNotNull(config.getQuotaConfig());
+ assertEquals(config.getQuotaConfig().getMaxQueriesPerSecond(), "200.0");
+
+ QueryConfig queryConfig = config.getQueryConfig();
+ assertNotNull(queryConfig);
+ assertEquals(queryConfig.getTimeoutMs(), Long.valueOf(3000L));
+ assertEquals(queryConfig.getDisableGroovy(), Boolean.FALSE);
+ assertEquals(queryConfig.getExpressionOverrideMap(),
Collections.singletonMap("func(a)", "b"));
+
+ assertEquals(config.getRefOfflineTableName(), "table_OFFLINE");
+ assertEquals(config.getRefRealtimeTableName(), "table_REALTIME");
+
+ TimeBoundaryConfig tbc = config.getTimeBoundaryConfig();
+ assertNotNull(tbc);
+ assertEquals(tbc.getBoundaryStrategy(), "MIN");
+ assertEquals(tbc.getParameters().get("key"), "value");
+ }
+
+ @Test
+ public void testZNRecordFieldMapping()
+ throws Exception {
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ physicalTableConfigMap.put("table_OFFLINE", new PhysicalTableConfig());
+
+ LogicalTableConfig config = new LogicalTableConfigBuilder()
+ .setTableName("logicalTable")
+ .setBrokerTenant("testTenant")
+ .setPhysicalTableConfigMap(physicalTableConfigMap)
+ .setRefOfflineTableName("table_OFFLINE")
+ .build();
+
+ ZNRecord znRecord = _serDe.toZNRecord(config);
+
+ assertEquals(znRecord.getId(), "logicalTable");
+
assertEquals(znRecord.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY),
"logicalTable");
+
assertEquals(znRecord.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY),
"testTenant");
+
assertEquals(znRecord.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY),
"table_OFFLINE");
+
assertNull(znRecord.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY));
+ assertNull(znRecord.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY));
+ assertNull(znRecord.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY));
+
assertNull(znRecord.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY));
+
+ Map<String, String> mapField =
znRecord.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY);
+ assertNotNull(mapField);
+ assertEquals(mapField.size(), 1);
+ assertTrue(mapField.containsKey("table_OFFLINE"));
+ }
+
+ private LogicalTableConfig jsonRoundTrip(LogicalTableConfig config)
+ throws Exception {
+ return JsonUtils.stringToObject(config.toSingleLineJsonString(),
LogicalTableConfig.class);
+ }
+
+ private LogicalTableConfig znRecordRoundTrip(LogicalTableConfig config)
+ throws Exception {
+ ZNRecord znRecord = _serDe.toZNRecord(config);
+ return _serDe.fromZNRecord(znRecord);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]