This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 34f1f21e4 Add Kafka catalog (#4106)
34f1f21e4 is described below
commit 34f1f21e4888e37e8e8777f85037309c1277ae2e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Feb 11 13:33:05 2023 +0800
Add Kafka catalog (#4106)
---
.../api/table/catalog/DataTypeConvertor.java | 12 +-
.../seatunnel/common/schema/SeaTunnelSchema.java | 2 +-
.../jdbc/catalog/MysqlDataTypeConvertor.java | 50 ++++-
.../jdbc/catalog/MysqlDataTypeConvertorTest.java | 18 +-
.../seatunnel/kafka/catalog/KafkaCatalog.java | 239 +++++++++++++++++++++
.../kafka/catalog/KafkaCatalogFactory.java} | 32 +--
.../kafka/catalog/KafkaDataTypeConvertor.java | 65 ++++++
.../connectors/seatunnel/kafka/config/Config.java | 3 +
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 2 +-
.../seatunnel/kafka/source/KafkaSource.java | 3 +-
.../kafka/catalog/KafkaDataTypeConvertorTest.java} | 17 +-
11 files changed, 411 insertions(+), 32 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
index fd51ce001..66d10e4d4 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
@@ -30,12 +30,20 @@ public interface DataTypeConvertor<T> {
/**
* Transfer the data type from connector to SeaTunnel.
*
- * @param t origin data type
+ * @param connectorType e.g. "int", "varchar(255)"
+ * @return the data type of SeaTunnel
+ */
+ SeaTunnelDataType<?> toSeaTunnelType(String connectorType);
+
+ /**
+ * Transfer the data type from connector to SeaTunnel.
+ *
+ * @param connectorType origin data type
* @param dataTypeProperties origin data type properties, e.g. precision,
scale, length
* @return SeaTunnel data type
*/
// todo: If the origin data type contains the properties, we can remove
the dataTypeProperties.
- SeaTunnelDataType<?> toSeaTunnelType(T t, Map<String, Object>
dataTypeProperties) throws DataTypeConvertException;
+ SeaTunnelDataType<?> toSeaTunnelType(T connectorType, Map<String, Object>
dataTypeProperties) throws DataTypeConvertException;
/**
* Transfer the data type from SeaTunnel to connector.
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
index 44cdb10e0..3b044cbe3 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
@@ -102,7 +102,7 @@ public class SeaTunnelSchema implements Serializable {
return new int[]{precision, scale};
}
- private static SeaTunnelDataType<?> parseTypeByString(String type) {
+ public static SeaTunnelDataType<?> parseTypeByString(String type) {
// init precision (used by decimal type)
int precision = 0;
// init scale (used by decimal type)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
index 247177033..bc1686a3a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -28,9 +30,11 @@ import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import com.google.common.collect.ImmutableMap;
import com.mysql.cj.MysqlType;
import org.apache.commons.collections4.MapUtils;
+import java.util.Collections;
import java.util.Map;
public class MysqlDataTypeConvertor implements DataTypeConvertor<MysqlType> {
@@ -44,9 +48,47 @@ public class MysqlDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
public static final String PRECISION = "precision";
public static final String SCALE = "scale";
+ public static final Integer DEFAULT_PRECISION = 10;
+
+ public static final Integer DEFAULT_SCALE = 0;
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
+ checkNotNull(connectorType, "connectorType can not be null");
+ MysqlType mysqlType = MysqlType.getByName(connectorType);
+ Map<String, Object> dataTypeProperties;
+ switch (mysqlType) {
+ case BIGINT_UNSIGNED:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ // parse precision and scale
+ int left = connectorType.indexOf("(");
+ int right = connectorType.indexOf(")");
+ int precision = DEFAULT_PRECISION;
+ int scale = DEFAULT_SCALE;
+ if (left != -1 && right != -1) {
+ String[] precisionAndScale = connectorType.substring(left
+ 1, right).split(",");
+ if (precisionAndScale.length == 2) {
+ precision = Integer.parseInt(precisionAndScale[0]);
+ scale = Integer.parseInt(precisionAndScale[1]);
+ } else if (precisionAndScale.length == 1) {
+ precision = Integer.parseInt(precisionAndScale[0]);
+ }
+ }
+ dataTypeProperties = ImmutableMap.of(PRECISION, precision,
SCALE, scale);
+ break;
+ default:
+ dataTypeProperties = Collections.emptyMap();
+ break;
+ }
+ return toSeaTunnelType(mysqlType, dataTypeProperties);
+ }
+
// todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType
doesn't contains properties.
@Override
public SeaTunnelDataType<?> toSeaTunnelType(MysqlType mysqlType,
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+ checkNotNull(mysqlType, "mysqlType can not be null");
+
switch (mysqlType) {
case NULL:
return BasicType.VOID_TYPE;
@@ -100,12 +142,8 @@ public class MysqlDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
case BIGINT_UNSIGNED:
case DECIMAL:
case DECIMAL_UNSIGNED:
- Integer precision = MapUtils.getInteger(dataTypeProperties,
PRECISION);
- Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE);
- if (precision == null || scale == null) {
- throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType,
- new IllegalArgumentException("Decimal type must have
precision and scale"));
- }
+ Integer precision = MapUtils.getInteger(dataTypeProperties,
PRECISION, DEFAULT_PRECISION);
+ Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE,
DEFAULT_SCALE);
return new DecimalType(precision, scale);
// TODO: support 'SET' & 'YEAR' type
default:
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
index 936e7ea12..4ef05012b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
import com.mysql.cj.MysqlType;
import org.junit.jupiter.api.Assertions;
@@ -30,7 +31,22 @@ public class MysqlDataTypeConvertorTest {
private MysqlDataTypeConvertor mysqlDataTypeConvertor =
MysqlDataTypeConvertor.getInstance();
@Test
- public void from() {
+ public void toSeaTunnelTypeWithString() {
+ Assertions.assertEquals(
+ new DecimalType(5, 2),
+ mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL(5,2)"));
+
+ Assertions.assertEquals(
+ new DecimalType(5, 0),
+ mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL(5)"));
+
+ Assertions.assertEquals(
+ new DecimalType(10, 0),
+ mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL"));
+ }
+
+ @Test
+ public void toSeaTunnelType() {
Assertions.assertEquals(BasicType.VOID_TYPE,
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, Collections.emptyMap()));
Assertions.assertEquals(BasicType.STRING_TYPE,
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR,
Collections.emptyMap()));
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
new file mode 100644
index 000000000..9ec868d8a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * This is a KafkaCatalog implementation.
+ * <p> In kafka the database and table both are the topic name.
+ */
+public class KafkaCatalog implements Catalog {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaCatalog.class);
+ private final String catalogName;
+ private final String bootstrapServers;
+ private final String defaultTopic;
+
+ private AdminClient adminClient;
+
+ public KafkaCatalog(String catalogName, String defaultTopic, String
bootstrapServers) {
+ this.catalogName = checkNotNull(catalogName, "catalogName cannot be
null");
+ this.bootstrapServers = checkNotNull(bootstrapServers,
"bootstrapServers cannot be null");
+ this.defaultTopic = checkNotNull(defaultTopic, "defaultTopic cannot be
null");
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+
+ adminClient = AdminClient.create(properties);
+ try {
+ TopicDescription topicDescription =
getTopicDescription(defaultTopic);
+ if (topicDescription == null) {
+ throw new DatabaseNotExistException(catalogName, defaultTopic);
+ }
+ LOGGER.info("Catalog {} is established connection to {}, the
default database is {}",
+ catalogName, bootstrapServers, topicDescription.name());
+ } catch (DatabaseNotExistException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Catalog : %s establish connection to %s error",
catalogName, bootstrapServers), e);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ adminClient.close();
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultTopic;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ checkNotNull(databaseName, "databaseName cannot be null");
+ try {
+ TopicDescription topicDescription =
getTopicDescription(databaseName);
+ return topicDescription != null;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Catalog : %s check database : %s exists error",
catalogName, databaseName), e);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ ListTopicsResult listTopicsResult = adminClient.listTopics();
+ Set<String> topics = listTopicsResult.names().get();
+ return Lists.newArrayList(topics);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CatalogException(String.format("Listing database in
catalog %s error", catalogName), e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ }
+ return Lists.newArrayList(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ return databaseExists(tablePath.getDatabaseName());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath) throws CatalogException,
TableNotExistException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ TopicDescription topicDescription;
+ try {
+ topicDescription = getTopicDescription(tablePath.getTableName());
+ if (topicDescription == null) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ throw new CatalogException(
+ String.format("Catalog : %s get table : %s error",
catalogName, tablePath), e);
+ }
+ TableIdentifier tableIdentifier = TableIdentifier.of(catalogName,
tablePath.getDatabaseName(), tablePath.getTableName());
+ // todo: Set the schema of the table?
+ TableSchema tableSchema = TableSchema.builder()
+ .build();
+ return CatalogTable.of(
+ tableIdentifier,
+ tableSchema,
+ buildConnectorOptions(topicDescription),
+ Collections.emptyList(),
+ "");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ if (tableExists(tablePath)) {
+ throw new TableAlreadyExistException(catalogName, tablePath);
+ }
+ Map<String, String> options = table.getOptions();
+ int partitionNumber =
Integer.parseInt(options.get(Config.PARTITION.key()));
+ short replicationFactor =
Short.parseShort(options.get(Config.REPLICATION_FACTOR));
+ NewTopic newTopic = new NewTopic(tablePath.getTableName(),
partitionNumber, replicationFactor);
+ CreateTopicsResult createTopicsResult =
adminClient.createTopics(Lists.newArrayList(newTopic));
+ try {
+ createTopicsResult.all().get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new CatalogException(
+ String.format("Catalog : %s create table : %s error",
catalogName, tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(Lists.newArrayList(tablePath.getTableName()));
+ try {
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CatalogException(
+ String.format("Catalog : %s drop table : %s error",
catalogName, tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
+ // todo: We cannot create topic here, since we don't know the
partition number and replication factor.
+ throw new UnsupportedOperationException("Kafka catalog does not
support create database");
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
+ // todo:
+ dropTable(tablePath, ignoreIfNotExists);
+ }
+
+ @Override
+ public DataTypeConvertor<?> getDataTypeConvertor() {
+ return KafkaDataTypeConvertor.getInstance();
+ }
+
+ private TopicDescription getTopicDescription(String topicName) throws
ExecutionException, InterruptedException {
+ DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(Lists.newArrayList(topicName));
+ KafkaFuture<TopicDescription> topicDescriptionKafkaFuture =
describeTopicsResult.topicNameValues().get(topicName);
+ return topicDescriptionKafkaFuture.get();
+ }
+
+ private Map<String, String> buildConnectorOptions(TopicDescription
topicDescription) {
+ String topicName = topicDescription.name();
+ List<TopicPartitionInfo> partitions = topicDescription.partitions();
+ List<Node> replicas = partitions.get(0).replicas();
+ // todo: Do we need to support partition has different replication
factor?
+ Map<String, String> options = new HashMap<>();
+ options.put(Config.BOOTSTRAP_SERVERS.key(), bootstrapServers);
+ options.put(Config.TOPIC.key(), topicName);
+ options.put(Config.PARTITION.key(), String.valueOf(partitions.size()));
+ options.put(Config.REPLICATION_FACTOR,
String.valueOf(replicas.size()));
+ return options;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
similarity index 51%
copy from
seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
copy to
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
index 936e7ea12..a30e459d0 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
@@ -15,23 +15,29 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
-import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
-import com.mysql.cj.MysqlType;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public class KafkaCatalogFactory implements CatalogFactory {
-import java.util.Collections;
-
-public class MysqlDataTypeConvertorTest {
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ // todo: Do we need to use singleton here?
+ return null;
+ }
- private MysqlDataTypeConvertor mysqlDataTypeConvertor =
MysqlDataTypeConvertor.getInstance();
+ @Override
+ public String factoryIdentifier() {
+ return
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+ }
- @Test
- public void from() {
- Assertions.assertEquals(BasicType.VOID_TYPE,
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, Collections.emptyMap()));
- Assertions.assertEquals(BasicType.STRING_TYPE,
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR,
Collections.emptyMap()));
+ @Override
+ public OptionRule optionRule() {
+ // todo:
+ return null;
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
new file mode 100644
index 000000000..70e14c9f9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.Map;
+
+/**
+ * The data type convertor of Kafka, only fields defined in schema has the
type.
+ * e.g.
+ * <pre>
+ * schema = {
+ * fields {
+ * name = "string"
+ * age = "int"
+ * }
+ * }
+ * </pre>
+ * <p> Right now the data type of kafka is SeaTunnelType, so we don't need to
convert the data type.
+ */
+public class KafkaDataTypeConvertor implements
DataTypeConvertor<SeaTunnelDataType<?>> {
+
+ private static final KafkaDataTypeConvertor INSTANCE = new
KafkaDataTypeConvertor();
+
+ private KafkaDataTypeConvertor() {
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
+ // todo: Do we have utils to deserialize a string to SeaTunnelDataType?
+ return null;
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?>
connectorDataType, Map<String, Object> dataTypeProperties) throws
DataTypeConvertException {
+ return connectorDataType;
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?>
seaTunnelDataType, Map<String, Object> dataTypeProperties) throws
DataTypeConvertException {
+ return seaTunnelDataType;
+ }
+
+ public static KafkaDataTypeConvertor getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 2962339b7..38d971b57 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -25,6 +25,9 @@ import java.util.Map;
public class Config {
+ public static final String CONNECTOR_IDENTITY = "Kafka";
+ public static final String REPLICATION_FACTOR = "replication.factor";
+
/**
* The default data format is JSON
*/
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 9e651bab1..0a60407b4 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -104,6 +104,6 @@ public class KafkaSink implements
SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
@Override
public String getPluginName() {
- return "Kafka";
+ return
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 3bcfa6d6d..35e161c98 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -89,7 +89,7 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
@Override
public String getPluginName() {
- return "Kafka";
+ return
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
}
@Override
@@ -194,6 +194,7 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
private void setDeserialization(Config config) {
if (config.hasPath(SCHEMA.key())) {
Config schema = config.getConfig(SCHEMA.key());
+ // todo: use KafkaDataTypeConvertor here?
typeInfo =
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
String format = DEFAULT_FORMAT;
if (config.hasPath(FORMAT.key())) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
similarity index 63%
copy from
seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
copy to
seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
index 936e7ea12..6a8633f01 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
@@ -15,23 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
import org.apache.seatunnel.api.table.type.BasicType;
-import com.mysql.cj.MysqlType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
-public class MysqlDataTypeConvertorTest {
+class KafkaDataTypeConvertorTest {
- private MysqlDataTypeConvertor mysqlDataTypeConvertor =
MysqlDataTypeConvertor.getInstance();
+ private KafkaDataTypeConvertor kafkaDataTypeConvertor =
KafkaDataTypeConvertor.getInstance();
@Test
- public void from() {
- Assertions.assertEquals(BasicType.VOID_TYPE,
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, Collections.emptyMap()));
- Assertions.assertEquals(BasicType.STRING_TYPE,
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR,
Collections.emptyMap()));
+ void toSeaTunnelType() {
+ Assertions.assertEquals(BasicType.STRING_TYPE,
kafkaDataTypeConvertor.toConnectorType(BasicType.STRING_TYPE,
Collections.emptyMap()));
+ }
+
+ @Test
+ void toConnectorType() {
+ Assertions.assertEquals(BasicType.STRING_TYPE,
kafkaDataTypeConvertor.toConnectorType(BasicType.STRING_TYPE,
Collections.emptyMap()));
}
}