This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1c6176e518 KafkaSource use Factory to create source (#5635)
1c6176e518 is described below

commit 1c6176e5188ca9481b7b7d5d0eeb4e4a27ef2f5c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Oct 21 10:22:37 2023 +0800

    KafkaSource use Factory to create source (#5635)
---
 .../connectors/seatunnel/kafka/config/Config.java  |   8 +-
 .../seatunnel/kafka/source/KafkaSource.java        | 247 +++------------------
 .../seatunnel/kafka/source/KafkaSourceConfig.java  | 238 ++++++++++++++++++++
 .../seatunnel/kafka/source/KafkaSourceFactory.java |  11 +
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     |  17 ++
 ...ource_text_to_console_assert_catalog_table.conf | 169 ++++++++++++++
 ...ompatibleKafkaConnectDeserializationSchema.java |  13 +-
 7 files changed, 472 insertions(+), 231 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index f126e563fb..1bf0c19fc2 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.config;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
@@ -106,7 +108,7 @@ public class Config {
     public static final Option<String> FIELD_DELIMITER =
             Options.key("field_delimiter")
                     .stringType()
-                    .noDefaultValue()
+                    .defaultValue(DEFAULT_FIELD_DELIMITER)
                     .withDescription("Customize the field delimiter for data 
format.");
 
     public static final Option<Integer> PARTITION =
@@ -145,9 +147,9 @@ public class Config {
                     .noDefaultValue()
                     .withDescription("The time required for consumption mode 
to be timestamp.");
 
-    public static final Option<Config> START_MODE_OFFSETS =
+    public static final Option<Map<String, Long>> START_MODE_OFFSETS =
             Options.key("start_mode.offsets")
-                    .objectType(Config.class)
+                    .type(new TypeReference<Map<String, Long>>() {})
                     .noDefaultValue()
                     .withDescription(
                             "The offset required for consumption mode to be 
specific_offsets.");
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 802d7986a9..6df9809ea2 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -17,79 +17,36 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
-import 
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
-import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
-import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
-import org.apache.seatunnel.format.text.TextDeserializationSchema;
-import org.apache.seatunnel.format.text.constant.TextFormatConstant;
-
-import org.apache.kafka.common.TopicPartition;
 
 import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+import java.util.List;
 
 @AutoService(SeaTunnelSource.class)
 public class KafkaSource
         implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, 
KafkaSourceState>,
                 SupportParallelism {
 
-    private final ConsumerMetadata metadata = new ConsumerMetadata();
-    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
-    private SeaTunnelRowType typeInfo;
     private JobContext jobContext;
-    private long discoveryIntervalMillis = 
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
-    private MessageFormatErrorHandleWay messageFormatErrorHandleWay =
-            MessageFormatErrorHandleWay.FAIL;
+
+    private final KafkaSourceConfig kafkaSourceConfig;
+
+    public KafkaSource(ReadonlyConfig readonlyConfig) {
+        kafkaSourceConfig = new KafkaSourceConfig(readonlyConfig);
+    }
 
     @Override
     public Boundedness getBoundedness() {
@@ -104,195 +61,47 @@ public class KafkaSource
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, TOPIC.key(), 
BOOTSTRAP_SERVERS.key());
-        if (!result.isSuccess()) {
-            throw new KafkaConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.metadata.setTopic(config.getString(TOPIC.key()));
-        if (config.hasPath(PATTERN.key())) {
-            this.metadata.setPattern(config.getBoolean(PATTERN.key()));
-        } else {
-            this.metadata.setPattern(PATTERN.defaultValue());
-        }
-        
this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS.key()));
-        this.metadata.setProperties(new Properties());
-
-        if (config.hasPath(CONSUMER_GROUP.key())) {
-            
this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP.key()));
-        } else {
-            this.metadata.setConsumerGroup(CONSUMER_GROUP.defaultValue());
-        }
-
-        if (config.hasPath(COMMIT_ON_CHECKPOINT.key())) {
-            
this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT.key()));
-        } else {
-            
this.metadata.setCommitOnCheckpoint(COMMIT_ON_CHECKPOINT.defaultValue());
-        }
-
-        if (config.hasPath(START_MODE.key())) {
-            StartMode startMode =
-                    
StartMode.valueOf(config.getString(START_MODE.key()).toUpperCase());
-            this.metadata.setStartMode(startMode);
-            switch (startMode) {
-                case TIMESTAMP:
-                    long startOffsetsTimestamp = 
config.getLong(START_MODE_TIMESTAMP.key());
-                    long currentTimestamp = System.currentTimeMillis();
-                    if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > 
currentTimestamp) {
-                        throw new IllegalArgumentException(
-                                "start_mode.timestamp The value is smaller 
than 0 or smaller than the current time");
-                    }
-                    
this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
-                    break;
-                case SPECIFIC_OFFSETS:
-                    Config offsets = 
config.getConfig(START_MODE_OFFSETS.key());
-                    ConfigRenderOptions options = 
ConfigRenderOptions.concise();
-                    String offsetsJson = offsets.root().render(options);
-                    if (offsetsJson == null) {
-                        throw new IllegalArgumentException(
-                                "start mode is "
-                                        + StartMode.SPECIFIC_OFFSETS
-                                        + "but no specific offsets were 
specified.");
-                    }
-                    Map<TopicPartition, Long> specificStartOffsets = new 
HashMap<>();
-                    ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
-                    jsonNodes
-                            .fieldNames()
-                            .forEachRemaining(
-                                    key -> {
-                                        int splitIndex = key.lastIndexOf("-");
-                                        String topic = key.substring(0, 
splitIndex);
-                                        String partition = 
key.substring(splitIndex + 1);
-                                        long offset = 
jsonNodes.get(key).asLong();
-                                        TopicPartition topicPartition =
-                                                new TopicPartition(
-                                                        topic, 
Integer.valueOf(partition));
-                                        
specificStartOffsets.put(topicPartition, offset);
-                                    });
-                    
this.metadata.setSpecificStartOffsets(specificStartOffsets);
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
-            this.discoveryIntervalMillis =
-                    
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
-        }
-
-        if (CheckConfigUtil.isValidParam(config, KAFKA_CONFIG.key())) {
-            config.getObject(KAFKA_CONFIG.key())
-                    .forEach(
-                            (key, value) ->
-                                    this.metadata.getProperties().put(key, 
value.unwrapped()));
-        }
-
-        if (config.hasPath(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
-            MessageFormatErrorHandleWay formatErrorWayOption =
-                    
ReadonlyConfig.fromConfig(config).get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
-            switch (formatErrorWayOption) {
-                case FAIL:
-                case SKIP:
-                    this.messageFormatErrorHandleWay = formatErrorWayOption;
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        setDeserialization(config);
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Lists.newArrayList(kafkaSourceConfig.getCatalogTable());
     }
 
     @Override
-    public SeaTunnelRowType getProducedType() {
-        return this.typeInfo;
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return kafkaSourceConfig.getCatalogTable().getSeaTunnelRowType();
     }
 
     @Override
     public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
-            SourceReader.Context readerContext) throws Exception {
+            SourceReader.Context readerContext) {
         return new KafkaSourceReader(
-                this.metadata, deserializationSchema, readerContext, 
messageFormatErrorHandleWay);
+                kafkaSourceConfig.getMetadata(),
+                kafkaSourceConfig.getDeserializationSchema(),
+                readerContext,
+                kafkaSourceConfig.getMessageFormatErrorHandleWay());
     }
 
     @Override
     public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> 
createEnumerator(
-            SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) 
throws Exception {
+            SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) 
{
         return new KafkaSourceSplitEnumerator(
-                this.metadata, enumeratorContext, discoveryIntervalMillis);
+                kafkaSourceConfig.getMetadata(),
+                enumeratorContext,
+                kafkaSourceConfig.getDiscoveryIntervalMillis());
     }
 
     @Override
     public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> 
restoreEnumerator(
             SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
-            KafkaSourceState checkpointState)
-            throws Exception {
+            KafkaSourceState checkpointState) {
         return new KafkaSourceSplitEnumerator(
-                this.metadata, enumeratorContext, checkpointState, 
discoveryIntervalMillis);
+                kafkaSourceConfig.getMetadata(),
+                enumeratorContext,
+                checkpointState,
+                kafkaSourceConfig.getDiscoveryIntervalMillis());
     }
 
     @Override
     public void setJobContext(JobContext jobContext) {
         this.jobContext = jobContext;
     }
-
-    private void setDeserialization(Config config) {
-        if (config.hasPath(SCHEMA.key())) {
-            Config schema = config.getConfig(SCHEMA.key());
-            // todo: use KafkaDataTypeConvertor here?
-            typeInfo = 
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
-            MessageFormat format = 
ReadonlyConfig.fromConfig(config).get(FORMAT);
-            switch (format) {
-                case JSON:
-                    deserializationSchema = new 
JsonDeserializationSchema(false, false, typeInfo);
-                    break;
-                case TEXT:
-                    String delimiter = DEFAULT_FIELD_DELIMITER;
-                    if (config.hasPath(FIELD_DELIMITER.key())) {
-                        delimiter = config.getString(FIELD_DELIMITER.key());
-                    }
-                    deserializationSchema =
-                            TextDeserializationSchema.builder()
-                                    .seaTunnelRowType(typeInfo)
-                                    .delimiter(delimiter)
-                                    .build();
-                    break;
-                case CANAL_JSON:
-                    deserializationSchema =
-                            CanalJsonDeserializationSchema.builder(typeInfo)
-                                    .setIgnoreParseErrors(true)
-                                    .build();
-                    break;
-                case COMPATIBLE_KAFKA_CONNECT_JSON:
-                    deserializationSchema =
-                            new CompatibleKafkaConnectDeserializationSchema(
-                                    typeInfo, config, false, false);
-                    break;
-                case DEBEZIUM_JSON:
-                    boolean includeSchema = 
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
-                    if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
-                        includeSchema = 
config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
-                    }
-                    deserializationSchema =
-                            new DebeziumJsonDeserializationSchema(typeInfo, 
true, includeSchema);
-                    break;
-                default:
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
"Unsupported format: " + format);
-            }
-        } else {
-            typeInfo = CatalogTableUtil.buildSimpleTextSchema();
-            this.deserializationSchema =
-                    TextDeserializationSchema.builder()
-                            .seaTunnelRowType(typeInfo)
-                            .delimiter(TextFormatConstant.PLACEHOLDER)
-                            .build();
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
new file mode 100644
index 0000000000..d5866061fe
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import 
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
+import 
org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.kafka.common.TopicPartition;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+
+public class KafkaSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @Getter private final ConsumerMetadata metadata;
+
+    @Getter private final DeserializationSchema<SeaTunnelRow> 
deserializationSchema;
+
+    @Getter private final CatalogTable catalogTable;
+
+    @Getter private final MessageFormatErrorHandleWay 
messageFormatErrorHandleWay;
+
+    @Getter private final long discoveryIntervalMillis;
+
+    public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
+        this.metadata = createConsumerMetadata(readonlyConfig);
+        this.discoveryIntervalMillis = 
readonlyConfig.get(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
+        this.messageFormatErrorHandleWay =
+                readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
+        this.catalogTable = createCatalogTable(readonlyConfig);
+        this.deserializationSchema = createDeserializationSchema(catalogTable, 
readonlyConfig);
+    }
+
+    private ConsumerMetadata createConsumerMetadata(ReadonlyConfig 
readonlyConfig) {
+        ConsumerMetadata consumerMetadata = new ConsumerMetadata();
+        consumerMetadata.setTopic(readonlyConfig.get(TOPIC));
+        
consumerMetadata.setBootstrapServers(readonlyConfig.get(BOOTSTRAP_SERVERS));
+        consumerMetadata.setPattern(readonlyConfig.get(PATTERN));
+        consumerMetadata.setProperties(new Properties());
+        consumerMetadata.setConsumerGroup(readonlyConfig.get(CONSUMER_GROUP));
+        
consumerMetadata.setCommitOnCheckpoint(readonlyConfig.get(COMMIT_ON_CHECKPOINT));
+        // parse start mode
+        readonlyConfig
+                .getOptional(START_MODE)
+                .ifPresent(
+                        startMode -> {
+                            consumerMetadata.setStartMode(startMode);
+                            switch (startMode) {
+                                case TIMESTAMP:
+                                    long startOffsetsTimestamp =
+                                            
readonlyConfig.get(START_MODE_TIMESTAMP);
+                                    long currentTimestamp = 
System.currentTimeMillis();
+                                    if (startOffsetsTimestamp < 0
+                                            || startOffsetsTimestamp > 
currentTimestamp) {
+                                        throw new IllegalArgumentException(
+                                                "start_mode.timestamp The 
value is smaller than 0 or smaller than the current time");
+                                    }
+                                    consumerMetadata.setStartOffsetsTimestamp(
+                                            startOffsetsTimestamp);
+                                    break;
+                                case SPECIFIC_OFFSETS:
+                                    // Key is topic-partition, value is offset
+                                    Map<String, Long> offsetMap =
+                                            
readonlyConfig.get(START_MODE_OFFSETS);
+                                    if (MapUtils.isEmpty(offsetMap)) {
+                                        throw new IllegalArgumentException(
+                                                "start mode is "
+                                                        + 
StartMode.SPECIFIC_OFFSETS
+                                                        + "but no specific 
offsets were specified.");
+                                    }
+                                    Map<TopicPartition, Long> 
specificStartOffsets =
+                                            new HashMap<>();
+                                    offsetMap.forEach(
+                                            (topicPartitionKey, offset) -> {
+                                                int splitIndex = 
topicPartitionKey.lastIndexOf("-");
+                                                String topic =
+                                                        
topicPartitionKey.substring(0, splitIndex);
+                                                String partition =
+                                                        
topicPartitionKey.substring(splitIndex + 1);
+                                                TopicPartition topicPartition =
+                                                        new TopicPartition(
+                                                                topic, 
Integer.parseInt(partition));
+                                                
specificStartOffsets.put(topicPartition, offset);
+                                            });
+                                    
consumerMetadata.setSpecificStartOffsets(specificStartOffsets);
+                                    break;
+                                default:
+                                    break;
+                            }
+                        });
+
+        readonlyConfig
+                .getOptional(KAFKA_CONFIG)
+                .ifPresent(
+                        kafkaConfig ->
+                                kafkaConfig.forEach(
+                                        (key, value) ->
+                                                
consumerMetadata.getProperties().put(key, value)));
+
+        return consumerMetadata;
+    }
+
+    private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
+        Optional<Map<String, Object>> schemaOptions =
+                readonlyConfig.getOptional(TableSchemaOptions.SCHEMA);
+        if (schemaOptions.isPresent()) {
+            return CatalogTableUtil.buildWithConfig(readonlyConfig);
+        } else {
+            TableIdentifier tableIdentifier = 
TableIdentifier.of(CONNECTOR_IDENTITY, null, null);
+            TableSchema tableSchema =
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "content",
+                                            new SeaTunnelRowType(
+                                                    new String[] {"content"},
+                                                    new SeaTunnelDataType<?>[] 
{
+                                                        BasicType.STRING_TYPE
+                                                    }),
+                                            0,
+                                            false,
+                                            null,
+                                            null))
+                            .build();
+            return CatalogTable.of(
+                    tableIdentifier,
+                    tableSchema,
+                    Collections.emptyMap(),
+                    Collections.emptyList(),
+                    null);
+        }
+    }
+
+    private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(
+            CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+
+        if 
(!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+            return TextDeserializationSchema.builder()
+                    .seaTunnelRowType(seaTunnelRowType)
+                    .delimiter(TextFormatConstant.PLACEHOLDER)
+                    .build();
+        }
+
+        MessageFormat format = readonlyConfig.get(FORMAT);
+        switch (format) {
+            case JSON:
+                return new JsonDeserializationSchema(false, false, 
seaTunnelRowType);
+            case TEXT:
+                String delimiter = readonlyConfig.get(FIELD_DELIMITER);
+                return TextDeserializationSchema.builder()
+                        .seaTunnelRowType(seaTunnelRowType)
+                        .delimiter(delimiter)
+                        .build();
+            case CANAL_JSON:
+                return CanalJsonDeserializationSchema.builder(seaTunnelRowType)
+                        .setIgnoreParseErrors(true)
+                        .build();
+            case COMPATIBLE_KAFKA_CONNECT_JSON:
+                Boolean keySchemaEnable =
+                        readonlyConfig.get(
+                                
KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED);
+                Boolean valueSchemaEnable =
+                        readonlyConfig.get(
+                                
KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED);
+                return new CompatibleKafkaConnectDeserializationSchema(
+                        seaTunnelRowType, keySchemaEnable, valueSchemaEnable, 
false, false);
+            case DEBEZIUM_JSON:
+                boolean includeSchema = 
readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
+                return new DebeziumJsonDeserializationSchema(seaTunnelRowType, 
true, includeSchema);
+            default:
+                throw new SeaTunnelJsonFormatException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported 
format: " + format);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 21057040ec..4a41d38d6d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -19,13 +19,18 @@ package 
org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class KafkaSourceFactory implements TableSourceFactory {
 
@@ -54,6 +59,12 @@ public class KafkaSourceFactory implements 
TableSourceFactory {
                 .build();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
KafkaSource(context.getOptions());
+    }
+
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return KafkaSource.class;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index c72eb9d5c3..b75c55153e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -207,6 +207,23 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer 
container)
+            throws IOException, InterruptedException {
+        TextSerializationSchema serializer =
+                TextSerializationSchema.builder()
+                        .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                        .delimiter(",")
+                        .build();
+        generateTestData(
+                row -> new ProducerRecord<>("test_topic_text", null, 
serializer.serialize(row)),
+                0,
+                100);
+        Container.ExecResult execResult =
+                
container.executeJob("/textFormatIT/kafka_source_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     @TestTemplate
     public void testSourceKafkaJsonToConsole(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
new file mode 100644
index 0000000000..32649cfcc2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_text"
+    result_table_name = "kafka_table"
+    kafka.auto.offset.reset = "earliest"
+    format_error_handle_way = fail
+    schema = {
+      columns = [
+        {
+              name = id
+              type = bigint
+        }
+        {
+              name = c_map
+              type = "map<string, smallint>"
+        }
+        {
+              name = c_array
+              type = "array<tinyint>"
+        }
+        {
+              name = c_string
+              type = "string"
+        }
+        {
+              name = c_boolean
+              type = "boolean"
+        }
+        {
+              name = c_tinyint
+              type = "tinyint"
+        }
+        {
+              name = c_smallint
+              type = "smallint"
+        }
+        {
+              name = c_int
+              type = "int"
+        }
+        {
+              name = c_bigint
+              type = "bigint"
+        }
+        {
+              name = c_float
+              type = "float"
+        }
+        {
+              name = c_double
+              type = "double"
+        }
+        {
+              name = c_decimal
+              type = "decimal(2, 1)"
+        }
+        {
+              name = c_bytes
+              type = "bytes"
+        }
+        {
+              name = c_date
+              type = "date"
+        }
+        {
+              name = c_timestamp
+              type = "timestamp"
+        }
+      ]
+      primaryKey = {
+        name = "primary key"
+        columnNames = ["id"]
+      }
+      constraintKeys = [
+        {
+            constraintName = "unique_c_string"
+            constraintType = UNIQUE_KEY
+            constraintColumns = [
+                {
+                    columnName = "c_string"
+                    sortType = ASC
+                }
+            ]
+        }
+     ]
+    }
+    format = text
+    field_delimiter = ","
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "kafka_table"
+  }
+  Assert {
+    source_table_name = "kafka_table"
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = id
+            field_type = long
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              },
+              {
+                rule_type = MIN
+                rule_value = 0
+              },
+              {
+                rule_type = MAX
+                rule_value = 99
+              }
+            ]
+          }
+        ]
+      catalog_table_rule = {
+        primary_key_rule = {
+            primary_key_name = "primary key"
+            primary_key_columns = ["id"]
+        }
+        constraint_key_rule = [
+            {
+            constraint_key_name = "unique_c_string"
+            constraint_key_type = UNIQUE_KEY
+            constraint_key_columns = [
+                {
+                    constraint_key_column_name = "c_string"
+                    constraint_key_sort_type = ASC
+                }
+            ]
+            }
+        ]
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
index b2e6ac97e9..896a3fe5e1 100644
--- 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
@@ -18,9 +18,7 @@
 package org.apache.seatunnel.format.compatible.kafka.connect.json;
 
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.RowKind;
@@ -49,7 +47,6 @@ import java.io.UnsupportedEncodingException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collections;
-import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -74,16 +71,14 @@ public class CompatibleKafkaConnectDeserializationSchema
 
     public CompatibleKafkaConnectDeserializationSchema(
             @NonNull SeaTunnelRowType seaTunnelRowType,
-            @NonNull Config config,
+            boolean keySchemaEnable,
+            boolean valueSchemaEnable,
             boolean failOnMissingField,
             boolean ignoreParseErrors) {
 
-        Map<String, String> configMap = 
ReadonlyConfig.fromConfig(config).toMap();
         this.seaTunnelRowType = seaTunnelRowType;
-        this.keySchemaEnable =
-                
KafkaConnectJsonFormatOptions.getKeyConverterSchemaEnabled(configMap);
-        this.valueSchemaEnable =
-                
KafkaConnectJsonFormatOptions.getValueConverterSchemaEnabled(configMap);
+        this.keySchemaEnable = keySchemaEnable;
+        this.valueSchemaEnable = valueSchemaEnable;
 
         // Runtime converter
         this.runtimeConverter =


Reply via email to