This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1c6176e518 KafkaSource use Factory to create source (#5635)
1c6176e518 is described below
commit 1c6176e5188ca9481b7b7d5d0eeb4e4a27ef2f5c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Oct 21 10:22:37 2023 +0800
KafkaSource use Factory to create source (#5635)
---
.../connectors/seatunnel/kafka/config/Config.java | 8 +-
.../seatunnel/kafka/source/KafkaSource.java | 247 +++------------------
.../seatunnel/kafka/source/KafkaSourceConfig.java | 238 ++++++++++++++++++++
.../seatunnel/kafka/source/KafkaSourceFactory.java | 11 +
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 17 ++
...ource_text_to_console_assert_catalog_table.conf | 169 ++++++++++++++
...ompatibleKafkaConnectDeserializationSchema.java | 13 +-
7 files changed, 472 insertions(+), 231 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index f126e563fb..1bf0c19fc2 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
@@ -106,7 +108,7 @@ public class Config {
public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
- .noDefaultValue()
+ .defaultValue(DEFAULT_FIELD_DELIMITER)
.withDescription("Customize the field delimiter for data
format.");
public static final Option<Integer> PARTITION =
@@ -145,9 +147,9 @@ public class Config {
.noDefaultValue()
.withDescription("The time required for consumption mode
to be timestamp.");
- public static final Option<Config> START_MODE_OFFSETS =
+ public static final Option<Map<String, Long>> START_MODE_OFFSETS =
Options.key("start_mode.offsets")
- .objectType(Config.class)
+ .type(new TypeReference<Map<String, Long>>() {})
.noDefaultValue()
.withDescription(
"The offset required for consumption mode to be
specific_offsets.");
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 802d7986a9..6df9809ea2 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -17,79 +17,36 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
-import
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
-import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
-import
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
-import org.apache.seatunnel.format.text.TextDeserializationSchema;
-import org.apache.seatunnel.format.text.constant.TextFormatConstant;
-
-import org.apache.kafka.common.TopicPartition;
import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+import java.util.List;
@AutoService(SeaTunnelSource.class)
public class KafkaSource
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit,
KafkaSourceState>,
SupportParallelism {
- private final ConsumerMetadata metadata = new ConsumerMetadata();
- private DeserializationSchema<SeaTunnelRow> deserializationSchema;
- private SeaTunnelRowType typeInfo;
private JobContext jobContext;
- private long discoveryIntervalMillis =
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
- private MessageFormatErrorHandleWay messageFormatErrorHandleWay =
- MessageFormatErrorHandleWay.FAIL;
+
+ private final KafkaSourceConfig kafkaSourceConfig;
+
+ public KafkaSource(ReadonlyConfig readonlyConfig) {
+ kafkaSourceConfig = new KafkaSourceConfig(readonlyConfig);
+ }
@Override
public Boundedness getBoundedness() {
@@ -104,195 +61,47 @@ public class KafkaSource
}
@Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(config, TOPIC.key(),
BOOTSTRAP_SERVERS.key());
- if (!result.isSuccess()) {
- throw new KafkaConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.metadata.setTopic(config.getString(TOPIC.key()));
- if (config.hasPath(PATTERN.key())) {
- this.metadata.setPattern(config.getBoolean(PATTERN.key()));
- } else {
- this.metadata.setPattern(PATTERN.defaultValue());
- }
-
this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS.key()));
- this.metadata.setProperties(new Properties());
-
- if (config.hasPath(CONSUMER_GROUP.key())) {
-
this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP.key()));
- } else {
- this.metadata.setConsumerGroup(CONSUMER_GROUP.defaultValue());
- }
-
- if (config.hasPath(COMMIT_ON_CHECKPOINT.key())) {
-
this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT.key()));
- } else {
-
this.metadata.setCommitOnCheckpoint(COMMIT_ON_CHECKPOINT.defaultValue());
- }
-
- if (config.hasPath(START_MODE.key())) {
- StartMode startMode =
-
StartMode.valueOf(config.getString(START_MODE.key()).toUpperCase());
- this.metadata.setStartMode(startMode);
- switch (startMode) {
- case TIMESTAMP:
- long startOffsetsTimestamp =
config.getLong(START_MODE_TIMESTAMP.key());
- long currentTimestamp = System.currentTimeMillis();
- if (startOffsetsTimestamp < 0 || startOffsetsTimestamp >
currentTimestamp) {
- throw new IllegalArgumentException(
- "start_mode.timestamp The value is smaller
than 0 or smaller than the current time");
- }
-
this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
- break;
- case SPECIFIC_OFFSETS:
- Config offsets =
config.getConfig(START_MODE_OFFSETS.key());
- ConfigRenderOptions options =
ConfigRenderOptions.concise();
- String offsetsJson = offsets.root().render(options);
- if (offsetsJson == null) {
- throw new IllegalArgumentException(
- "start mode is "
- + StartMode.SPECIFIC_OFFSETS
- + "but no specific offsets were
specified.");
- }
- Map<TopicPartition, Long> specificStartOffsets = new
HashMap<>();
- ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
- jsonNodes
- .fieldNames()
- .forEachRemaining(
- key -> {
- int splitIndex = key.lastIndexOf("-");
- String topic = key.substring(0,
splitIndex);
- String partition =
key.substring(splitIndex + 1);
- long offset =
jsonNodes.get(key).asLong();
- TopicPartition topicPartition =
- new TopicPartition(
- topic,
Integer.valueOf(partition));
-
specificStartOffsets.put(topicPartition, offset);
- });
-
this.metadata.setSpecificStartOffsets(specificStartOffsets);
- break;
- default:
- break;
- }
- }
-
- if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
- this.discoveryIntervalMillis =
-
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
- }
-
- if (CheckConfigUtil.isValidParam(config, KAFKA_CONFIG.key())) {
- config.getObject(KAFKA_CONFIG.key())
- .forEach(
- (key, value) ->
- this.metadata.getProperties().put(key,
value.unwrapped()));
- }
-
- if (config.hasPath(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
- MessageFormatErrorHandleWay formatErrorWayOption =
-
ReadonlyConfig.fromConfig(config).get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
- switch (formatErrorWayOption) {
- case FAIL:
- case SKIP:
- this.messageFormatErrorHandleWay = formatErrorWayOption;
- break;
- default:
- break;
- }
- }
-
- setDeserialization(config);
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Lists.newArrayList(kafkaSourceConfig.getCatalogTable());
}
@Override
- public SeaTunnelRowType getProducedType() {
- return this.typeInfo;
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return kafkaSourceConfig.getCatalogTable().getSeaTunnelRowType();
}
@Override
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
- SourceReader.Context readerContext) throws Exception {
+ SourceReader.Context readerContext) {
return new KafkaSourceReader(
- this.metadata, deserializationSchema, readerContext,
messageFormatErrorHandleWay);
+ kafkaSourceConfig.getMetadata(),
+ kafkaSourceConfig.getDeserializationSchema(),
+ readerContext,
+ kafkaSourceConfig.getMessageFormatErrorHandleWay());
}
@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState>
createEnumerator(
- SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext)
throws Exception {
+ SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext)
{
return new KafkaSourceSplitEnumerator(
- this.metadata, enumeratorContext, discoveryIntervalMillis);
+ kafkaSourceConfig.getMetadata(),
+ enumeratorContext,
+ kafkaSourceConfig.getDiscoveryIntervalMillis());
}
@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState>
restoreEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
- KafkaSourceState checkpointState)
- throws Exception {
+ KafkaSourceState checkpointState) {
return new KafkaSourceSplitEnumerator(
- this.metadata, enumeratorContext, checkpointState,
discoveryIntervalMillis);
+ kafkaSourceConfig.getMetadata(),
+ enumeratorContext,
+ checkpointState,
+ kafkaSourceConfig.getDiscoveryIntervalMillis());
}
@Override
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}
-
- private void setDeserialization(Config config) {
- if (config.hasPath(SCHEMA.key())) {
- Config schema = config.getConfig(SCHEMA.key());
- // todo: use KafkaDataTypeConvertor here?
- typeInfo =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
- MessageFormat format =
ReadonlyConfig.fromConfig(config).get(FORMAT);
- switch (format) {
- case JSON:
- deserializationSchema = new
JsonDeserializationSchema(false, false, typeInfo);
- break;
- case TEXT:
- String delimiter = DEFAULT_FIELD_DELIMITER;
- if (config.hasPath(FIELD_DELIMITER.key())) {
- delimiter = config.getString(FIELD_DELIMITER.key());
- }
- deserializationSchema =
- TextDeserializationSchema.builder()
- .seaTunnelRowType(typeInfo)
- .delimiter(delimiter)
- .build();
- break;
- case CANAL_JSON:
- deserializationSchema =
- CanalJsonDeserializationSchema.builder(typeInfo)
- .setIgnoreParseErrors(true)
- .build();
- break;
- case COMPATIBLE_KAFKA_CONNECT_JSON:
- deserializationSchema =
- new CompatibleKafkaConnectDeserializationSchema(
- typeInfo, config, false, false);
- break;
- case DEBEZIUM_JSON:
- boolean includeSchema =
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
- if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
- includeSchema =
config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
- }
- deserializationSchema =
- new DebeziumJsonDeserializationSchema(typeInfo,
true, includeSchema);
- break;
- default:
- throw new SeaTunnelJsonFormatException(
- CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported format: " + format);
- }
- } else {
- typeInfo = CatalogTableUtil.buildSimpleTextSchema();
- this.deserializationSchema =
- TextDeserializationSchema.builder()
- .seaTunnelRowType(typeInfo)
- .delimiter(TextFormatConstant.PLACEHOLDER)
- .build();
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
new file mode 100644
index 0000000000..d5866061fe
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
+import
org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+import
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.kafka.common.TopicPartition;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+
+public class KafkaSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private final ConsumerMetadata metadata;
+
+ @Getter private final DeserializationSchema<SeaTunnelRow>
deserializationSchema;
+
+ @Getter private final CatalogTable catalogTable;
+
+ @Getter private final MessageFormatErrorHandleWay
messageFormatErrorHandleWay;
+
+ @Getter private final long discoveryIntervalMillis;
+
+ public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
+ this.metadata = createConsumerMetadata(readonlyConfig);
+ this.discoveryIntervalMillis =
readonlyConfig.get(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
+ this.messageFormatErrorHandleWay =
+ readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
+ this.catalogTable = createCatalogTable(readonlyConfig);
+ this.deserializationSchema = createDeserializationSchema(catalogTable,
readonlyConfig);
+ }
+
+ private ConsumerMetadata createConsumerMetadata(ReadonlyConfig
readonlyConfig) {
+ ConsumerMetadata consumerMetadata = new ConsumerMetadata();
+ consumerMetadata.setTopic(readonlyConfig.get(TOPIC));
+
consumerMetadata.setBootstrapServers(readonlyConfig.get(BOOTSTRAP_SERVERS));
+ consumerMetadata.setPattern(readonlyConfig.get(PATTERN));
+ consumerMetadata.setProperties(new Properties());
+ consumerMetadata.setConsumerGroup(readonlyConfig.get(CONSUMER_GROUP));
+
consumerMetadata.setCommitOnCheckpoint(readonlyConfig.get(COMMIT_ON_CHECKPOINT));
+ // parse start mode
+ readonlyConfig
+ .getOptional(START_MODE)
+ .ifPresent(
+ startMode -> {
+ consumerMetadata.setStartMode(startMode);
+ switch (startMode) {
+ case TIMESTAMP:
+ long startOffsetsTimestamp =
+
readonlyConfig.get(START_MODE_TIMESTAMP);
+ long currentTimestamp =
System.currentTimeMillis();
+ if (startOffsetsTimestamp < 0
+ || startOffsetsTimestamp >
currentTimestamp) {
+ throw new IllegalArgumentException(
+ "start_mode.timestamp The
value is smaller than 0 or smaller than the current time");
+ }
+ consumerMetadata.setStartOffsetsTimestamp(
+ startOffsetsTimestamp);
+ break;
+ case SPECIFIC_OFFSETS:
+ // Key is topic-partition, value is offset
+ Map<String, Long> offsetMap =
+
readonlyConfig.get(START_MODE_OFFSETS);
+ if (MapUtils.isEmpty(offsetMap)) {
+ throw new IllegalArgumentException(
+ "start mode is "
+ +
StartMode.SPECIFIC_OFFSETS
+ + "but no specific
offsets were specified.");
+ }
+ Map<TopicPartition, Long>
specificStartOffsets =
+ new HashMap<>();
+ offsetMap.forEach(
+ (topicPartitionKey, offset) -> {
+ int splitIndex =
topicPartitionKey.lastIndexOf("-");
+ String topic =
+
topicPartitionKey.substring(0, splitIndex);
+ String partition =
+
topicPartitionKey.substring(splitIndex + 1);
+ TopicPartition topicPartition =
+ new TopicPartition(
+ topic,
Integer.parseInt(partition));
+
specificStartOffsets.put(topicPartition, offset);
+ });
+
consumerMetadata.setSpecificStartOffsets(specificStartOffsets);
+ break;
+ default:
+ break;
+ }
+ });
+
+ readonlyConfig
+ .getOptional(KAFKA_CONFIG)
+ .ifPresent(
+ kafkaConfig ->
+ kafkaConfig.forEach(
+ (key, value) ->
+
consumerMetadata.getProperties().put(key, value)));
+
+ return consumerMetadata;
+ }
+
+ private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
+ Optional<Map<String, Object>> schemaOptions =
+ readonlyConfig.getOptional(TableSchemaOptions.SCHEMA);
+ if (schemaOptions.isPresent()) {
+ return CatalogTableUtil.buildWithConfig(readonlyConfig);
+ } else {
+ TableIdentifier tableIdentifier =
TableIdentifier.of(CONNECTOR_IDENTITY, null, null);
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "content",
+ new SeaTunnelRowType(
+ new String[] {"content"},
+ new SeaTunnelDataType<?>[]
{
+ BasicType.STRING_TYPE
+ }),
+ 0,
+ false,
+ null,
+ null))
+ .build();
+ return CatalogTable.of(
+ tableIdentifier,
+ tableSchema,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ null);
+ }
+ }
+
+ private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(
+ CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+
+ if
(!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+ return TextDeserializationSchema.builder()
+ .seaTunnelRowType(seaTunnelRowType)
+ .delimiter(TextFormatConstant.PLACEHOLDER)
+ .build();
+ }
+
+ MessageFormat format = readonlyConfig.get(FORMAT);
+ switch (format) {
+ case JSON:
+ return new JsonDeserializationSchema(false, false,
seaTunnelRowType);
+ case TEXT:
+ String delimiter = readonlyConfig.get(FIELD_DELIMITER);
+ return TextDeserializationSchema.builder()
+ .seaTunnelRowType(seaTunnelRowType)
+ .delimiter(delimiter)
+ .build();
+ case CANAL_JSON:
+ return CanalJsonDeserializationSchema.builder(seaTunnelRowType)
+ .setIgnoreParseErrors(true)
+ .build();
+ case COMPATIBLE_KAFKA_CONNECT_JSON:
+ Boolean keySchemaEnable =
+ readonlyConfig.get(
+
KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED);
+ Boolean valueSchemaEnable =
+ readonlyConfig.get(
+
KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED);
+ return new CompatibleKafkaConnectDeserializationSchema(
+ seaTunnelRowType, keySchemaEnable, valueSchemaEnable,
false, false);
+ case DEBEZIUM_JSON:
+ boolean includeSchema =
readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA);
+ return new DebeziumJsonDeserializationSchema(seaTunnelRowType,
true, includeSchema);
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported
format: " + format);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 21057040ec..4a41d38d6d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -19,13 +19,18 @@ package
org.apache.seatunnel.connectors.seatunnel.kafka.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+
@AutoService(Factory.class)
public class KafkaSourceFactory implements TableSourceFactory {
@@ -54,6 +59,12 @@ public class KafkaSourceFactory implements
TableSourceFactory {
.build();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
KafkaSource(context.getOptions());
+ }
+
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return KafkaSource.class;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index c72eb9d5c3..b75c55153e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -207,6 +207,23 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @TestTemplate
+ public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer
container)
+ throws IOException, InterruptedException {
+ TextSerializationSchema serializer =
+ TextSerializationSchema.builder()
+ .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+ .delimiter(",")
+ .build();
+ generateTestData(
+ row -> new ProducerRecord<>("test_topic_text", null,
serializer.serialize(row)),
+ 0,
+ 100);
+ Container.ExecResult execResult =
+
container.executeJob("/textFormatIT/kafka_source_text_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
@TestTemplate
public void testSourceKafkaJsonToConsole(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
new file mode 100644
index 0000000000..32649cfcc2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_text"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ format_error_handle_way = fail
+ schema = {
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ {
+ name = c_map
+ type = "map<string, smallint>"
+ }
+ {
+ name = c_array
+ type = "array<tinyint>"
+ }
+ {
+ name = c_string
+ type = "string"
+ }
+ {
+ name = c_boolean
+ type = "boolean"
+ }
+ {
+ name = c_tinyint
+ type = "tinyint"
+ }
+ {
+ name = c_smallint
+ type = "smallint"
+ }
+ {
+ name = c_int
+ type = "int"
+ }
+ {
+ name = c_bigint
+ type = "bigint"
+ }
+ {
+ name = c_float
+ type = "float"
+ }
+ {
+ name = c_double
+ type = "double"
+ }
+ {
+ name = c_decimal
+ type = "decimal(2, 1)"
+ }
+ {
+ name = c_bytes
+ type = "bytes"
+ }
+ {
+ name = c_date
+ type = "date"
+ }
+ {
+ name = c_timestamp
+ type = "timestamp"
+ }
+ ]
+ primaryKey = {
+ name = "primary key"
+ columnNames = ["id"]
+ }
+ constraintKeys = [
+ {
+ constraintName = "unique_c_string"
+ constraintType = UNIQUE_KEY
+ constraintColumns = [
+ {
+ columnName = "c_string"
+ sortType = ASC
+ }
+ ]
+ }
+ ]
+ }
+ format = text
+ field_delimiter = ","
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "kafka_table"
+ }
+ Assert {
+ source_table_name = "kafka_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ catalog_table_rule = {
+ primary_key_rule = {
+ primary_key_name = "primary key"
+ primary_key_columns = ["id"]
+ }
+ constraint_key_rule = [
+ {
+ constraint_key_name = "unique_c_string"
+ constraint_key_type = UNIQUE_KEY
+ constraint_key_columns = [
+ {
+ constraint_key_column_name = "c_string"
+ constraint_key_sort_type = ASC
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
index b2e6ac97e9..896a3fe5e1 100644
---
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
@@ -18,9 +18,7 @@
package org.apache.seatunnel.format.compatible.kafka.connect.json;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.RowKind;
@@ -49,7 +47,6 @@ import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
-import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -74,16 +71,14 @@ public class CompatibleKafkaConnectDeserializationSchema
public CompatibleKafkaConnectDeserializationSchema(
@NonNull SeaTunnelRowType seaTunnelRowType,
- @NonNull Config config,
+ boolean keySchemaEnable,
+ boolean valueSchemaEnable,
boolean failOnMissingField,
boolean ignoreParseErrors) {
- Map<String, String> configMap =
ReadonlyConfig.fromConfig(config).toMap();
this.seaTunnelRowType = seaTunnelRowType;
- this.keySchemaEnable =
-
KafkaConnectJsonFormatOptions.getKeyConverterSchemaEnabled(configMap);
- this.valueSchemaEnable =
-
KafkaConnectJsonFormatOptions.getValueConverterSchemaEnabled(configMap);
+ this.keySchemaEnable = keySchemaEnable;
+ this.valueSchemaEnable = valueSchemaEnable;
// Runtime converter
this.runtimeConverter =