This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 34f1f21e4 Add Kafka catalog (#4106)
34f1f21e4 is described below

commit 34f1f21e4888e37e8e8777f85037309c1277ae2e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Feb 11 13:33:05 2023 +0800

    Add Kafka catalog (#4106)
---
 .../api/table/catalog/DataTypeConvertor.java       |  12 +-
 .../seatunnel/common/schema/SeaTunnelSchema.java   |   2 +-
 .../jdbc/catalog/MysqlDataTypeConvertor.java       |  50 ++++-
 .../jdbc/catalog/MysqlDataTypeConvertorTest.java   |  18 +-
 .../seatunnel/kafka/catalog/KafkaCatalog.java      | 239 +++++++++++++++++++++
 .../kafka/catalog/KafkaCatalogFactory.java}        |  32 +--
 .../kafka/catalog/KafkaDataTypeConvertor.java      |  65 ++++++
 .../connectors/seatunnel/kafka/config/Config.java  |   3 +
 .../connectors/seatunnel/kafka/sink/KafkaSink.java |   2 +-
 .../seatunnel/kafka/source/KafkaSource.java        |   3 +-
 .../kafka/catalog/KafkaDataTypeConvertorTest.java} |  17 +-
 11 files changed, 411 insertions(+), 32 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
index fd51ce001..66d10e4d4 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
@@ -30,12 +30,20 @@ public interface DataTypeConvertor<T> {
     /**
      * Transfer the data type from connector to SeaTunnel.
      *
-     * @param t                  origin data type
+     * @param connectorType e.g. "int", "varchar(255)"
+     * @return the data type of SeaTunnel
+     */
+    SeaTunnelDataType<?> toSeaTunnelType(String connectorType);
+
+    /**
+     * Transfer the data type from connector to SeaTunnel.
+     *
+     * @param connectorType      origin data type
      * @param dataTypeProperties origin data type properties, e.g. precision, 
scale, length
      * @return SeaTunnel data type
      */
     // todo: If the origin data type contains the properties, we can remove 
the dataTypeProperties.
-    SeaTunnelDataType<?> toSeaTunnelType(T t, Map<String, Object> 
dataTypeProperties) throws DataTypeConvertException;
+    SeaTunnelDataType<?> toSeaTunnelType(T connectorType, Map<String, Object> 
dataTypeProperties) throws DataTypeConvertException;
 
     /**
      * Transfer the data type from SeaTunnel to connector.
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
index 44cdb10e0..3b044cbe3 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
@@ -102,7 +102,7 @@ public class SeaTunnelSchema implements Serializable {
         return new int[]{precision, scale};
     }
 
-    private static SeaTunnelDataType<?> parseTypeByString(String type) {
+    public static SeaTunnelDataType<?> parseTypeByString(String type) {
         // init precision (used by decimal type)
         int precision = 0;
         // init scale (used by decimal type)
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
index 247177033..bc1686a3a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -28,9 +30,11 @@ import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 
+import com.google.common.collect.ImmutableMap;
 import com.mysql.cj.MysqlType;
 import org.apache.commons.collections4.MapUtils;
 
+import java.util.Collections;
 import java.util.Map;
 
 public class MysqlDataTypeConvertor implements DataTypeConvertor<MysqlType> {
@@ -44,9 +48,47 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
     public static final String PRECISION = "precision";
     public static final String SCALE = "scale";
 
+    public static final Integer DEFAULT_PRECISION = 10;
+
+    public static final Integer DEFAULT_SCALE = 0;
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
+        checkNotNull(connectorType, "connectorType can not be null");
+        MysqlType mysqlType = MysqlType.getByName(connectorType);
+        Map<String, Object> dataTypeProperties;
+        switch (mysqlType) {
+            case BIGINT_UNSIGNED:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+                // parse precision and scale
+                int left = connectorType.indexOf("(");
+                int right = connectorType.indexOf(")");
+                int precision = DEFAULT_PRECISION;
+                int scale = DEFAULT_SCALE;
+                if (left != -1 && right != -1) {
+                    String[] precisionAndScale = connectorType.substring(left 
+ 1, right).split(",");
+                    if (precisionAndScale.length == 2) {
+                        precision = Integer.parseInt(precisionAndScale[0]);
+                        scale = Integer.parseInt(precisionAndScale[1]);
+                    } else if (precisionAndScale.length == 1) {
+                        precision = Integer.parseInt(precisionAndScale[0]);
+                    }
+                }
+                dataTypeProperties = ImmutableMap.of(PRECISION, precision, 
SCALE, scale);
+                break;
+            default:
+                dataTypeProperties = Collections.emptyMap();
+                break;
+        }
+        return toSeaTunnelType(mysqlType, dataTypeProperties);
+    }
+
     // todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType 
doesn't contains properties.
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(MysqlType mysqlType, 
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+        checkNotNull(mysqlType, "mysqlType can not be null");
+
         switch (mysqlType) {
             case NULL:
                 return BasicType.VOID_TYPE;
@@ -100,12 +142,8 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
             case BIGINT_UNSIGNED:
             case DECIMAL:
             case DECIMAL_UNSIGNED:
-                Integer precision = MapUtils.getInteger(dataTypeProperties, 
PRECISION);
-                Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE);
-                if (precision == null || scale == null) {
-                    throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType,
-                        new IllegalArgumentException("Decimal type must have 
precision and scale"));
-                }
+                Integer precision = MapUtils.getInteger(dataTypeProperties, 
PRECISION, DEFAULT_PRECISION);
+                Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE, 
DEFAULT_SCALE);
                 return new DecimalType(precision, scale);
             // TODO: support 'SET' & 'YEAR' type
             default:
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
index 936e7ea12..4ef05012b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
 
 import com.mysql.cj.MysqlType;
 import org.junit.jupiter.api.Assertions;
@@ -30,7 +31,22 @@ public class MysqlDataTypeConvertorTest {
     private MysqlDataTypeConvertor mysqlDataTypeConvertor = 
MysqlDataTypeConvertor.getInstance();
 
     @Test
-    public void from() {
+    public void toSeaTunnelTypeWithString() {
+        Assertions.assertEquals(
+            new DecimalType(5, 2),
+            mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL(5,2)"));
+
+        Assertions.assertEquals(
+            new DecimalType(5, 0),
+            mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL(5)"));
+
+        Assertions.assertEquals(
+            new DecimalType(10, 0),
+            mysqlDataTypeConvertor.toSeaTunnelType("DECIMAL"));
+    }
+
+    @Test
+    public void toSeaTunnelType() {
         Assertions.assertEquals(BasicType.VOID_TYPE, 
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, Collections.emptyMap()));
         Assertions.assertEquals(BasicType.STRING_TYPE, 
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR, 
Collections.emptyMap()));
     }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
new file mode 100644
index 000000000..9ec868d8a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * This is a KafkaCatalog implementation.
+ * <p> In kafka the database and table both are the topic name.
+ */
+public class KafkaCatalog implements Catalog {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaCatalog.class);
+    private final String catalogName;
+    private final String bootstrapServers;
+    private final String defaultTopic;
+
+    private AdminClient adminClient;
+
+    public KafkaCatalog(String catalogName, String defaultTopic, String 
bootstrapServers) {
+        this.catalogName = checkNotNull(catalogName, "catalogName cannot be 
null");
+        this.bootstrapServers = checkNotNull(bootstrapServers, 
"bootstrapServers cannot be null");
+        this.defaultTopic = checkNotNull(defaultTopic, "defaultTopic cannot be 
null");
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+
+        adminClient = AdminClient.create(properties);
+        try {
+            TopicDescription topicDescription = 
getTopicDescription(defaultTopic);
+            if (topicDescription == null) {
+                throw new DatabaseNotExistException(catalogName, defaultTopic);
+            }
+            LOGGER.info("Catalog {} is established connection to {}, the 
default database is {}",
+                catalogName, bootstrapServers, topicDescription.name());
+        } catch (DatabaseNotExistException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Catalog : %s establish connection to %s error", 
catalogName, bootstrapServers), e);
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        adminClient.close();
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return defaultTopic;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        checkNotNull(databaseName, "databaseName cannot be null");
+        try {
+            TopicDescription topicDescription = 
getTopicDescription(databaseName);
+            return topicDescription != null;
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Catalog : %s check database : %s exists error", 
catalogName, databaseName), e);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try {
+            ListTopicsResult listTopicsResult = adminClient.listTopics();
+            Set<String> topics = listTopicsResult.names().get();
+            return Lists.newArrayList(topics);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CatalogException(String.format("Listing database in 
catalog %s error", catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws 
CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        }
+        return Lists.newArrayList(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        return databaseExists(tablePath.getDatabaseName());
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath) throws CatalogException, 
TableNotExistException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        TopicDescription topicDescription;
+        try {
+            topicDescription = getTopicDescription(tablePath.getTableName());
+            if (topicDescription == null) {
+                throw new TableNotExistException(catalogName, tablePath);
+            }
+        } catch (ExecutionException | InterruptedException e) {
+            throw new CatalogException(
+                String.format("Catalog : %s get table : %s error", 
catalogName, tablePath), e);
+        }
+        TableIdentifier tableIdentifier = TableIdentifier.of(catalogName, 
tablePath.getDatabaseName(), tablePath.getTableName());
+        // todo: Set the schema of the table?
+        TableSchema tableSchema = TableSchema.builder()
+            .build();
+        return CatalogTable.of(
+            tableIdentifier,
+            tableSchema,
+            buildConnectorOptions(topicDescription),
+            Collections.emptyList(),
+            "");
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+        throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        if (tableExists(tablePath)) {
+            throw new TableAlreadyExistException(catalogName, tablePath);
+        }
+        Map<String, String> options = table.getOptions();
+        int partitionNumber = 
Integer.parseInt(options.get(Config.PARTITION.key()));
+        short replicationFactor = 
Short.parseShort(options.get(Config.REPLICATION_FACTOR));
+        NewTopic newTopic = new NewTopic(tablePath.getTableName(), 
partitionNumber, replicationFactor);
+        CreateTopicsResult createTopicsResult = 
adminClient.createTopics(Lists.newArrayList(newTopic));
+        try {
+            createTopicsResult.all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new CatalogException(
+                String.format("Catalog : %s create table : %s error", 
catalogName, tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) 
throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+        DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(Lists.newArrayList(tablePath.getTableName()));
+        try {
+            deleteTopicsResult.all().get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CatalogException(
+                String.format("Catalog : %s drop table : %s error", 
catalogName, tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists) 
throws DatabaseAlreadyExistException, CatalogException {
+        // todo: We cannot create topic here, since we don't know the 
partition number and replication factor.
+        throw new UnsupportedOperationException("Kafka catalog does not 
support create database");
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) 
throws DatabaseNotExistException, CatalogException {
+        // todo:
+        dropTable(tablePath, ignoreIfNotExists);
+    }
+
+    @Override
+    public DataTypeConvertor<?> getDataTypeConvertor() {
+        return KafkaDataTypeConvertor.getInstance();
+    }
+
+    private TopicDescription getTopicDescription(String topicName) throws 
ExecutionException, InterruptedException {
+        DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(Lists.newArrayList(topicName));
+        KafkaFuture<TopicDescription> topicDescriptionKafkaFuture = 
describeTopicsResult.topicNameValues().get(topicName);
+        return topicDescriptionKafkaFuture.get();
+    }
+
+    private Map<String, String> buildConnectorOptions(TopicDescription 
topicDescription) {
+        String topicName = topicDescription.name();
+        List<TopicPartitionInfo> partitions = topicDescription.partitions();
+        List<Node> replicas = partitions.get(0).replicas();
+        // todo: Do we need to support partition has different replication 
factor?
+        Map<String, String> options = new HashMap<>();
+        options.put(Config.BOOTSTRAP_SERVERS.key(), bootstrapServers);
+        options.put(Config.TOPIC.key(), topicName);
+        options.put(Config.PARTITION.key(), String.valueOf(partitions.size()));
+        options.put(Config.REPLICATION_FACTOR, 
String.valueOf(replicas.size()));
+        return options;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
similarity index 51%
copy from 
seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
copy to 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
index 936e7ea12..a30e459d0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
@@ -15,23 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
 
-import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
 
-import com.mysql.cj.MysqlType;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public class KafkaCatalogFactory implements CatalogFactory {
 
-import java.util.Collections;
-
-public class MysqlDataTypeConvertorTest {
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        // todo: Do we need to use singleton here?
+        return null;
+    }
 
-    private MysqlDataTypeConvertor mysqlDataTypeConvertor = 
MysqlDataTypeConvertor.getInstance();
+    @Override
+    public String factoryIdentifier() {
+        return 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+    }
 
-    @Test
-    public void from() {
-        Assertions.assertEquals(BasicType.VOID_TYPE, 
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, Collections.emptyMap()));
-        Assertions.assertEquals(BasicType.STRING_TYPE, 
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR, 
Collections.emptyMap()));
+    @Override
+    public OptionRule optionRule() {
+        // todo:
+        return null;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
new file mode 100644
index 000000000..70e14c9f9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.Map;
+
+/**
+ * The data type convertor of Kafka, only fields defined in schema has the 
type.
+ * e.g.
+ * <pre>
+ * schema = {
+ *    fields {
+ *      name = "string"
+ *      age = "int"
+ *    }
+ * }
+ * </pre>
+ * <p> Right now the data type of kafka is SeaTunnelType, so we don't need to 
convert the data type.
+ */
+public class KafkaDataTypeConvertor implements 
DataTypeConvertor<SeaTunnelDataType<?>> {
+
+    private static final KafkaDataTypeConvertor INSTANCE = new 
KafkaDataTypeConvertor();
+
+    private KafkaDataTypeConvertor() {
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
+        // todo: Do we have utils to deserialize a string to SeaTunnelDataType?
+        return null;
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?> 
connectorDataType, Map<String, Object> dataTypeProperties) throws 
DataTypeConvertException {
+        return connectorDataType;
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?> 
seaTunnelDataType, Map<String, Object> dataTypeProperties) throws 
DataTypeConvertException {
+        return seaTunnelDataType;
+    }
+
+    public static KafkaDataTypeConvertor getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 2962339b7..38d971b57 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -25,6 +25,9 @@ import java.util.Map;
 
 public class Config {
 
+    public static final String CONNECTOR_IDENTITY = "Kafka";
+    public static final String REPLICATION_FACTOR = "replication.factor";
+
     /**
      * The default data format is JSON
      */
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 9e651bab1..0a60407b4 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -104,6 +104,6 @@ public class KafkaSink implements 
SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
 
     @Override
     public String getPluginName() {
-        return "Kafka";
+        return 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 3bcfa6d6d..35e161c98 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -89,7 +89,7 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
 
     @Override
     public String getPluginName() {
-        return "Kafka";
+        return 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
     }
 
     @Override
@@ -194,6 +194,7 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     private void setDeserialization(Config config) {
         if (config.hasPath(SCHEMA.key())) {
             Config schema = config.getConfig(SCHEMA.key());
+            // todo: use KafkaDataTypeConvertor here?
             typeInfo = 
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
             String format = DEFAULT_FORMAT;
             if (config.hasPath(FORMAT.key())) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
similarity index 63%
copy from 
seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
copy to 
seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
index 936e7ea12..6a8633f01 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
@@ -15,23 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
 
 import org.apache.seatunnel.api.table.type.BasicType;
 
-import com.mysql.cj.MysqlType;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 
-public class MysqlDataTypeConvertorTest {
+class KafkaDataTypeConvertorTest {
 
-    private MysqlDataTypeConvertor mysqlDataTypeConvertor = 
MysqlDataTypeConvertor.getInstance();
+    private KafkaDataTypeConvertor kafkaDataTypeConvertor = 
KafkaDataTypeConvertor.getInstance();
 
     @Test
-    public void from() {
-        Assertions.assertEquals(BasicType.VOID_TYPE, 
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, Collections.emptyMap()));
-        Assertions.assertEquals(BasicType.STRING_TYPE, 
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR, 
Collections.emptyMap()));
+    void toSeaTunnelType() {
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
kafkaDataTypeConvertor.toConnectorType(BasicType.STRING_TYPE, 
Collections.emptyMap()));
+    }
+
+    @Test
+    void toConnectorType() {
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
kafkaDataTypeConvertor.toConnectorType(BasicType.STRING_TYPE, 
Collections.emptyMap()));
     }
 }

Reply via email to