This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 84c0b8d660 [Improve][API] Unified tables_configs and table_list (#8100)
84c0b8d660 is described below
commit 84c0b8d66075e6b921b550a01380c665937b60ba
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Nov 25 14:39:09 2024 +0800
[Improve][API] Unified tables_configs and table_list (#8100)
---
docs/en/concept/schema-feature.md | 40 +++++++++++++++
docs/en/connector-v2/source/Hive.md | 18 +++++++
docs/en/connector-v2/source/kafka.md | 59 ++++++++++++++++++++++
docs/zh/concept/schema-feature.md | 40 +++++++++++++++
docs/zh/connector-v2/source/Kafka.md | 59 ++++++++++++++++++++++
.../api/table/catalog/CatalogOptions.java | 10 ++++
.../table/catalog/schema/TableSchemaOptions.java | 8 +++
.../seatunnel/assertion/sink/AssertConfig.java | 8 ---
.../seatunnel/assertion/sink/AssertSink.java | 2 +-
.../seatunnel/fake/config/FakeOption.java | 6 ---
.../fake/config/MultipleTableFakeSourceConfig.java | 5 +-
.../seatunnel/fake/source/FakeSourceFactory.java | 3 +-
.../config/BaseMultipleTableFileSourceConfig.java | 5 +-
.../file/config/BaseSourceConfigOptions.java | 10 ----
.../file/local/source/LocalFileSourceFactory.java | 2 +-
.../file/oss/source/OssFileSourceFactory.java | 4 +-
.../{BaseHiveOptions.java => HiveOptions.java} | 2 +-
.../connectors/seatunnel/hive/sink/HiveSink.java | 12 ++---
.../seatunnel/hive/sink/HiveSinkOptions.java | 4 +-
.../seatunnel/hive/source/HiveSourceFactory.java | 5 +-
.../hive/source/config/HiveSourceOptions.java | 36 -------------
.../config/MultipleTableHiveSourceConfig.java | 21 ++++++--
.../seatunnel/hive/utils/HiveMetaStoreProxy.java | 6 +--
.../seatunnel/hive/utils/HiveTableUtils.java | 4 +-
.../connectors/seatunnel/kafka/config/Config.java | 7 ---
.../seatunnel/kafka/source/KafkaSourceConfig.java | 14 +++--
.../seatunnel/kafka/source/KafkaSourceFactory.java | 5 +-
.../seatunnel/kudu/config/KuduSourceConfig.java | 9 ----
.../kudu/config/KuduSourceTableConfig.java | 5 +-
.../seatunnel/kudu/source/KuduSourceFactory.java | 2 +-
30 files changed, 295 insertions(+), 116 deletions(-)
diff --git a/docs/en/concept/schema-feature.md
b/docs/en/concept/schema-feature.md
index 7f88b87d06..3a4e83e06e 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -172,6 +172,46 @@ constraintKeys = [
| INDEX_KEY | key |
| UNIQUE_KEY | unique key |
+## Multi table schemas
+
+```
+tables_configs = [
+ {
+ schema {
+ table = "database.schema.table1"
+ schema_first = false
+ comment = "comment"
+ columns = [
+ ...
+ ]
+ primaryKey {
+ ...
+ }
+ constraintKeys {
+ ...
+ }
+ }
+ },
+ {
+ schema = {
+ table = "database.schema.table2"
+ schema_first = false
+ comment = "comment"
+ columns = [
+ ...
+ ]
+ primaryKey {
+ ...
+ }
+ constraintKeys {
+ ...
+ }
+ }
+ }
+]
+
+```
+
## How to use schema
### Recommended
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index 6667ccc8ee..af4edc4730 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -120,6 +120,24 @@ Source plugin common parameters, please refer to [Source
Common Options](../sour
```
### Example 2: Multiple tables
+> Note: Hive is a structured data source and should be use 'table_list', and
'tables_configs' will be removed in the future.
+
+```bash
+
+ Hive {
+ table_list = [
+ {
+ table_name = "default.seatunnel_orc_1"
+ metastore_uri = "thrift://namenode001:9083"
+ },
+ {
+ table_name = "default.seatunnel_orc_2"
+ metastore_uri = "thrift://namenode001:9083"
+ }
+ ]
+ }
+
+```
```bash
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index bcc659747b..dfc23a7572 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -189,6 +189,65 @@ source {
> This is written to the same pg table according to different formats and
> topics of parsing kafka Perform upsert operations based on the id
+> Note: Kafka is an unstructured data source and should be use
'tables_configs', and 'table_list' will be removed in the future.
+
+```hocon
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafka_e2e:9092"
+ tables_configs = [
+ {
+ topic = "^test-ogg-sou.*"
+ pattern = "true"
+ consumer.group = "ogg_multi_group"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = ogg_json
+ },
+ {
+ topic = "test-cdc_mds"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = canal_json
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+ generate_sink_sql = true
+ database = test
+ table = public.sink
+ primary_keys = ["id"]
+ }
+}
+```
+
```hocon
env {
diff --git a/docs/zh/concept/schema-feature.md
b/docs/zh/concept/schema-feature.md
index e9aacb1703..b504d264f8 100644
--- a/docs/zh/concept/schema-feature.md
+++ b/docs/zh/concept/schema-feature.md
@@ -172,6 +172,46 @@ constraintKeys = [
| INDEX_KEY | 键 |
| UNIQUE_KEY | 唯一键 |
+## 多表Schema
+
+```
+tables_configs = [
+ {
+ schema {
+ table = "database.schema.table1"
+ schema_first = false
+ comment = "comment"
+ columns = [
+ ...
+ ]
+ primaryKey {
+ ...
+ }
+ constraintKeys {
+ ...
+ }
+ }
+ },
+ {
+ schema = {
+ table = "database.schema.table2"
+ schema_first = false
+ comment = "comment"
+ columns = [
+ ...
+ ]
+ primaryKey {
+ ...
+ }
+ constraintKeys {
+ ...
+ }
+ }
+ }
+]
+
+```
+
## 如何使用schema
### 推荐
diff --git a/docs/zh/connector-v2/source/Kafka.md
b/docs/zh/connector-v2/source/Kafka.md
index c2ff4ee125..04820cc7c1 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -181,6 +181,65 @@ source {
> 根据不同的 Kafka 主题和格式解析数据,并基于 ID 执行 upsert 操作。
+> 注意: Kafka是一个非结构化数据源,应该使用`tables_configs`,将来会删除`table_list`
+
+```hocon
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafka_e2e:9092"
+ tables_configs = [
+ {
+ topic = "^test-ogg-sou.*"
+ pattern = "true"
+ consumer.group = "ogg_multi_group"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = ogg_json
+ },
+ {
+ topic = "test-cdc_mds"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = canal_json
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+ generate_sink_sql = true
+ database = test
+ table = public.sink
+ primary_keys = ["id"]
+ }
+}
+```
+
```hocon
env {
execution.parallelism = 1
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
index 2d1a3bc41b..046ac1dbed 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.api.table.catalog;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
@@ -56,4 +58,12 @@ public interface CatalogOptions {
.withDescription(
"The table names RegEx of the database to capture."
+ "The table name needs to include the
database name, for example: database_.*\\.table_.*");
+
+ Option<List<Map<String, Object>>> TABLE_LIST =
+ Options.key("table_list")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription(
+ "SeaTunnel Multi Table Schema, acts on structed
data sources. "
+ + "such as jdbc, paimon, doris, etc");
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
index 794dbe833c..34ca23ced4 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
@@ -55,6 +55,14 @@ public class TableSchemaOptions {
.noDefaultValue()
.withDescription("SeaTunnel Schema");
+ public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
+ Options.key("tables_configs")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription(
+ "SeaTunnel Multi Table Schema, acts on unstructed
data sources. "
+ + "such as file, assert, mongodb, etc");
+
// We should use ColumnOptions instead of FieldOptions
@Deprecated
public static class FieldOptions {
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
index d9fcea69ae..a35e91837f 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
@@ -22,7 +22,6 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import java.util.List;
import java.util.Map;
public class AssertConfig {
@@ -85,13 +84,6 @@ public class AssertConfig {
.withDescription(
"Rule definition of user's available data. Each
rule represents one field validation or row num validation.");
- public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
- Options.key("tables_configs")
- .type(new TypeReference<List<Map<String, Object>>>() {})
- .noDefaultValue()
- .withDescription(
- "Table configuration for the sink. Each table
configuration contains the table name and the rules for the table.");
-
public static final Option<String> TABLE_PATH =
Options.key("table_path")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index e84b6fbcb2..8da98df73e 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -42,11 +42,11 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import static
org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TABLE_CONFIGS;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_RULES;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
-import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_CONFIGS;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_PATH;
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
index fe956152a8..9c05c86bb6 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
@@ -27,12 +27,6 @@ import java.util.Map;
public class FakeOption {
- public static final Option<List<Map<String, Object>>> TABLES_CONFIGS =
- Options.key("tables_configs")
- .type(new TypeReference<List<Map<String, Object>>>() {})
- .noDefaultValue()
- .withDescription("The multiple table config list of fake
source");
-
public static final Option<List<Map<String, Object>>> ROWS =
Options.key("rows")
.type(new TypeReference<List<Map<String, Object>>>() {})
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
index 051d88a88f..6459e46566 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.commons.collections4.CollectionUtils;
@@ -36,7 +37,7 @@ public class MultipleTableFakeSourceConfig implements
Serializable {
@Getter private List<FakeConfig> fakeConfigs;
public MultipleTableFakeSourceConfig(ReadonlyConfig fakeSourceRootConfig) {
- if
(fakeSourceRootConfig.getOptional(FakeOption.TABLES_CONFIGS).isPresent()) {
+ if
(fakeSourceRootConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent())
{
parseFromConfigs(fakeSourceRootConfig);
} else {
parseFromConfig(fakeSourceRootConfig);
@@ -56,7 +57,7 @@ public class MultipleTableFakeSourceConfig implements
Serializable {
private void parseFromConfigs(ReadonlyConfig readonlyConfig) {
List<ReadonlyConfig> readonlyConfigs =
-
readonlyConfig.getOptional(FakeOption.TABLES_CONFIGS).get().stream()
+
readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).get().stream()
.map(ReadonlyConfig::fromMap)
.collect(Collectors.toList());
// Use the config outside if it's not set in sub config
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
index 73af0b0cd5..4ea71dda5b 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
@@ -54,7 +54,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.S
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE;
-import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TABLES_CONFIGS;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE;
@@ -72,7 +71,7 @@ public class FakeSourceFactory implements TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .optional(TABLES_CONFIGS)
+ .optional(TableSchemaOptions.TABLE_CONFIGS)
.optional(TableSchemaOptions.SCHEMA)
.optional(STRING_FAKE_MODE)
.conditional(STRING_FAKE_MODE, FakeOption.FakeMode.TEMPLATE,
STRING_TEMPLATE)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java
index 0cda71d091..f44e0d1f6f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import com.google.common.collect.Lists;
import lombok.Getter;
@@ -33,7 +34,7 @@ public abstract class BaseMultipleTableFileSourceConfig
implements Serializable
@Getter private List<BaseFileSourceConfig> fileSourceConfigs;
public BaseMultipleTableFileSourceConfig(ReadonlyConfig
fileSourceRootConfig) {
- if
(fileSourceRootConfig.getOptional(BaseSourceConfigOptions.TABLE_CONFIGS).isPresent())
{
+ if
(fileSourceRootConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent())
{
parseFromFileSourceConfigs(fileSourceRootConfig);
} else {
parseFromFileSourceConfig(fileSourceRootConfig);
@@ -42,7 +43,7 @@ public abstract class BaseMultipleTableFileSourceConfig
implements Serializable
private void parseFromFileSourceConfigs(ReadonlyConfig
fileSourceRootConfig) {
this.fileSourceConfigs =
-
fileSourceRootConfig.get(BaseSourceConfigOptions.TABLE_CONFIGS).stream()
+
fileSourceRootConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream()
.map(ReadonlyConfig::fromMap)
.map(this::getBaseSourceConfig)
.collect(Collectors.toList());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
index ddcc13d47d..de45726e3c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.common.utils.DateTimeUtils;
@@ -27,7 +25,6 @@ import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import java.util.List;
-import java.util.Map;
public class BaseSourceConfigOptions {
public static final Option<FileFormat> FILE_FORMAT_TYPE =
@@ -169,11 +166,4 @@ public class BaseSourceConfigOptions {
.enumType(ArchiveCompressFormat.class)
.defaultValue(ArchiveCompressFormat.NONE)
.withDescription("Archive compression codec");
-
- public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
- Options.key("tables_configs")
- .type(new TypeReference<List<Map<String, Object>>>() {})
- .noDefaultValue()
- .withDescription(
- "Local file source configs, used to create
multiple local file source.");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index fb76d276d5..0d58e506da 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -50,7 +50,7 @@ public class LocalFileSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .optional(BaseSourceConfigOptions.TABLE_CONFIGS)
+ .optional(TableSchemaOptions.TABLE_CONFIGS)
.optional(BaseSourceConfigOptions.FILE_PATH)
.optional(BaseSourceConfigOptions.FILE_FORMAT_TYPE)
.optional(BaseSourceConfigOptions.ENCODING)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 0eddf05693..6f140330cc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -51,9 +51,7 @@ public class OssFileSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .optional(
- org.apache.seatunnel.connectors.seatunnel.file.config
- .BaseSourceConfigOptions.TABLE_CONFIGS)
+ .optional(TableSchemaOptions.TABLE_CONFIGS)
.optional(OssConfigOptions.FILE_PATH)
.optional(OssConfigOptions.BUCKET)
.optional(OssConfigOptions.ACCESS_KEY)
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
similarity index 96%
rename from
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
rename to
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
index efed4e91c5..6fe55e2e71 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
-public class BaseHiveOptions extends BaseSourceConfigOptions {
+public class HiveOptions extends BaseSourceConfigOptions {
public static final Option<String> TABLE_NAME =
Options.key("table_name")
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 6e91baf001..13f48823b2 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -41,9 +41,9 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
import
org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.hive.sink.writter.HiveSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils;
@@ -216,16 +216,14 @@ public class HiveSink
StorageFactory.getStorageType(hdfsLocation)
.buildHadoopConfWithReadOnlyConfig(readonlyConfig);
readonlyConfig
- .getOptional(HiveSourceOptions.HDFS_SITE_PATH)
+ .getOptional(HiveOptions.HDFS_SITE_PATH)
.ifPresent(hadoopConf::setHdfsSitePath);
+
readonlyConfig.getOptional(HiveOptions.REMOTE_USER).ifPresent(hadoopConf::setRemoteUser);
readonlyConfig
- .getOptional(HiveSourceOptions.REMOTE_USER)
- .ifPresent(hadoopConf::setRemoteUser);
- readonlyConfig
- .getOptional(HiveSourceOptions.KERBEROS_PRINCIPAL)
+ .getOptional(HiveOptions.KERBEROS_PRINCIPAL)
.ifPresent(hadoopConf::setKerberosPrincipal);
readonlyConfig
- .getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH)
+ .getOptional(HiveOptions.KERBEROS_KEYTAB_PATH)
.ifPresent(hadoopConf::setKerberosKeytabPath);
return hadoopConf;
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
index a241717a44..404244b411 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.hive.sink;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
-public class HiveSinkOptions extends BaseHiveOptions {
+public class HiveSinkOptions extends HiveOptions {
public static final Option<Boolean> ABORT_DROP_PARTITION_METADATA =
Options.key("abort_drop_partition_metadata")
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
index 07adfef106..63e235d3dc 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.hive.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
@@ -27,7 +29,6 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
import com.google.auto.service.AutoService;
@@ -51,7 +52,7 @@ public class HiveSourceFactory implements TableSourceFactory {
return OptionRule.builder()
.optional(HiveConfig.TABLE_NAME)
.optional(HiveConfig.METASTORE_URI)
- .optional(HiveSourceOptions.TABLE_CONFIGS)
+ .optional(TableSchemaOptions.TABLE_CONFIGS,
CatalogOptions.TABLE_LIST)
.optional(BaseSourceConfigOptions.READ_PARTITIONS)
.optional(BaseSourceConfigOptions.READ_COLUMNS)
.optional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL)
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
deleted file mode 100644
index c30cb1783d..0000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
-
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions;
-
-import java.util.List;
-import java.util.Map;
-
-public class HiveSourceOptions extends BaseHiveOptions {
- public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
- Options.key("tables_configs")
- .type(new TypeReference<List<Map<String, Object>>>() {})
- .noDefaultValue()
- .withDescription(
- "Local file source configs, used to create
multiple local file source.");
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
index 9db899ca8c..249ffed497 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import com.google.common.collect.Lists;
import lombok.Getter;
@@ -33,16 +35,27 @@ public class MultipleTableHiveSourceConfig implements
Serializable {
@Getter private List<HiveSourceConfig> hiveSourceConfigs;
public MultipleTableHiveSourceConfig(ReadonlyConfig readonlyConfig) {
- if
(readonlyConfig.getOptional(HiveSourceOptions.TABLE_CONFIGS).isPresent()) {
- parseFromLocalFileSourceConfigs(readonlyConfig);
+ if (readonlyConfig.getOptional(CatalogOptions.TABLE_LIST).isPresent())
{
+ parseFromLocalFileSourceByTableList(readonlyConfig);
+ } else if
(readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) {
+ parseFromLocalFileSourceByTableConfigs(readonlyConfig);
} else {
parseFromLocalFileSourceConfig(readonlyConfig);
}
}
- private void parseFromLocalFileSourceConfigs(ReadonlyConfig
readonlyConfig) {
+ private void parseFromLocalFileSourceByTableList(ReadonlyConfig
readonlyConfig) {
this.hiveSourceConfigs =
- readonlyConfig.get(HiveSourceOptions.TABLE_CONFIGS).stream()
+ readonlyConfig.get(CatalogOptions.TABLE_LIST).stream()
+ .map(ReadonlyConfig::fromMap)
+ .map(HiveSourceConfig::new)
+ .collect(Collectors.toList());
+ }
+ // hive is structured, should use table_list
+ @Deprecated
+ private void parseFromLocalFileSourceByTableConfigs(ReadonlyConfig
readonlyConfig) {
+ this.hiveSourceConfigs =
+ readonlyConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream()
.map(ReadonlyConfig::fromMap)
.map(HiveSourceConfig::new)
.collect(Collectors.toList());
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 62d917ca0d..18482aa2c7 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -23,9 +23,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -54,7 +54,7 @@ public class HiveMetaStoreProxy {
private static final List<String> HADOOP_CONF_FILES =
ImmutableList.of("hive-site.xml");
private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
- String metastoreUri =
readonlyConfig.get(HiveSourceOptions.METASTORE_URI);
+ String metastoreUri = readonlyConfig.get(HiveOptions.METASTORE_URI);
String hiveHadoopConfigPath =
readonlyConfig.get(HiveConfig.HADOOP_CONF_PATH);
String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH);
HiveConf hiveConf = new HiveConf();
@@ -121,7 +121,7 @@ public class HiveMetaStoreProxy {
String.format(
"Using this hive uris [%s], hive conf [%s] to
initialize "
+ "hive metastore client instance failed",
- metastoreUri,
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH));
+ metastoreUri,
readonlyConfig.get(HiveOptions.HIVE_SITE_PATH));
throw new HiveConnectorException(
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
index 7b9192ea64..0805fe04f3 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
@@ -23,16 +23,16 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
import org.apache.hadoop.hive.metastore.api.Table;
public class HiveTableUtils {
public static Table getTableInfo(ReadonlyConfig readonlyConfig) {
- String table = readonlyConfig.get(HiveSourceOptions.TABLE_NAME);
+ String table = readonlyConfig.get(HiveOptions.TABLE_NAME);
TablePath tablePath = TablePath.of(table);
if (tablePath.getDatabaseName() == null || tablePath.getTableName() ==
null) {
throw new SeaTunnelRuntimeException(
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 293821e0ed..c01dc3e88d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -184,13 +184,6 @@ public class Config {
.withDescription(
"Semantics that can be chosen
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
- public static final Option<List<Map<String, Object>>> TABLE_LIST =
- Options.key("table_list")
- .type(new TypeReference<List<Map<String, Object>>>() {})
- .noDefaultValue()
- .withDescription(
- "Topic list config. You can configure only one
`table_list` or one `topic` at the same time");
-
public static final Option<String> PROTOBUF_SCHEMA =
Options.key("protobuf_schema")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 0f645d7218..1093d3f2f2 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.kafka.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
@@ -31,7 +32,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
@@ -114,11 +114,17 @@ public class KafkaSourceConfig implements Serializable {
private Map<TablePath, ConsumerMetadata> createMapConsumerMetadata(
ReadonlyConfig readonlyConfig) {
List<ConsumerMetadata> consumerMetadataList;
- if (readonlyConfig.getOptional(Config.TABLE_LIST).isPresent()) {
+ if
(readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) {
consumerMetadataList =
- readonlyConfig.get(Config.TABLE_LIST).stream()
+
readonlyConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream()
.map(ReadonlyConfig::fromMap)
- .map(config -> createConsumerMetadata(config))
+ .map(this::createConsumerMetadata)
+ .collect(Collectors.toList());
+ } else if
(readonlyConfig.getOptional(CatalogOptions.TABLE_LIST).isPresent()) {
+ consumerMetadataList =
+ readonlyConfig.get(CatalogOptions.TABLE_LIST).stream()
+ .map(ReadonlyConfig::fromMap)
+ .map(this::createConsumerMetadata)
.collect(Collectors.toList());
} else {
consumerMetadataList =
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 431e9a8c19..fe6f50a8ea 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -20,6 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.kafka.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
@@ -43,7 +45,8 @@ public class KafkaSourceFactory implements TableSourceFactory
{
public OptionRule optionRule() {
return OptionRule.builder()
.required(Config.BOOTSTRAP_SERVERS)
- .exclusive(Config.TOPIC, Config.TABLE_LIST)
+ .exclusive(
+ Config.TOPIC, TableSchemaOptions.TABLE_CONFIGS,
CatalogOptions.TABLE_LIST)
.optional(
Config.START_MODE,
Config.PATTERN,
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
index 5abc62ad72..3fd783bbf8 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -29,7 +27,6 @@ import lombok.Getter;
import lombok.ToString;
import java.util.List;
-import java.util.Map;
@Getter
@ToString
@@ -55,12 +52,6 @@ public class KuduSourceConfig extends CommonConfig {
.noDefaultValue()
.withDescription("Kudu scan filter expressions");
- public static final Option<List<Map<String, Object>>> TABLE_LIST =
- Options.key("table_list")
- .type(new TypeReference<List<Map<String, Object>>>() {})
- .noDefaultValue()
- .withDescription("table list config");
-
private int batchSizeBytes;
protected Long queryTimeout;
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
index b741b7474b..094807edc0 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -59,8 +60,8 @@ public class KuduSourceTableConfig implements Serializable {
try (KuduCatalog kuduCatalog = (KuduCatalog) optionalCatalog.get()) {
kuduCatalog.open();
- if (config.getOptional(KuduSourceConfig.TABLE_LIST).isPresent()) {
- return config.get(KuduSourceConfig.TABLE_LIST).stream()
+ if (config.getOptional(CatalogOptions.TABLE_LIST).isPresent()) {
+ return config.get(CatalogOptions.TABLE_LIST).stream()
.map(ReadonlyConfig::fromMap)
.map(readonlyConfig ->
parseKuduSourceConfig(readonlyConfig, kuduCatalog))
.collect(Collectors.toList());
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
index b1bdb7e4ab..78002a9390 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
@@ -33,8 +33,8 @@ import com.google.auto.service.AutoService;
import java.io.Serializable;
+import static org.apache.seatunnel.api.table.catalog.CatalogOptions.TABLE_LIST;
import static
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.MASTER;
-import static
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_LIST;
import static
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME;
@AutoService(Factory.class)