This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 9ee4d8394 Add ElasticSearch catalog (#4108)
9ee4d8394 is described below

commit 9ee4d8394c06e05078b8a216e04bf67f9b3bc73c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Feb 13 09:40:40 2023 +0800

    Add ElasticSearch catalog (#4108)
---
 .../api/table/catalog/DataTypeConvertor.java       |   8 +-
 .../catalog/ElasticSearchCatalog.java              | 158 +++++++++++++++++++++
 .../catalog/ElasticSearchCatalogFactory.java       |  44 ++++++
 .../catalog/ElasticSearchDataTypeConvertor.java}   |  33 ++---
 .../elasticsearch/client/EsRestClient.java         |  46 ++++++
 .../exception/ElasticsearchConnectorErrorCode.java |   5 +-
 .../jdbc/catalog/MysqlDataTypeConvertor.java       |  12 +-
 .../kafka/catalog/KafkaDataTypeConvertor.java      |   9 +-
 8 files changed, 280 insertions(+), 35 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
index 66d10e4d4..5ce924aff 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
@@ -30,20 +30,20 @@ public interface DataTypeConvertor<T> {
     /**
      * Transfer the data type from connector to SeaTunnel.
      *
-     * @param connectorType e.g. "int", "varchar(255)"
+     * @param connectorDataType e.g. "int", "varchar(255)"
      * @return the data type of SeaTunnel
      */
-    SeaTunnelDataType<?> toSeaTunnelType(String connectorType);
+    SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType);
 
     /**
      * Transfer the data type from connector to SeaTunnel.
      *
-     * @param connectorType      origin data type
+     * @param connectorDataType  origin data type
      * @param dataTypeProperties origin data type properties, e.g. precision, 
scale, length
      * @return SeaTunnel data type
      */
     // todo: If the origin data type contains the properties, we can remove 
the dataTypeProperties.
-    SeaTunnelDataType<?> toSeaTunnelType(T connectorType, Map<String, Object> 
dataTypeProperties) throws DataTypeConvertException;
+    SeaTunnelDataType<?> toSeaTunnelType(T connectorDataType, Map<String, 
Object> dataTypeProperties) throws DataTypeConvertException;
 
     /**
      * Transfer the data type from SeaTunnel to connector.
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
new file mode 100644
index 000000000..ca725453d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Elasticsearch catalog implementation.
+ * <p>In ElasticSearch, we use the index as the database and table.
+ */
+public class ElasticSearchCatalog implements Catalog {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticSearchCatalog.class);
+
+    private final String catalogName;
+    private final String defaultDatabase;
+    private final Config pluginConfig;
+
+    private EsRestClient esRestClient;
+
+    // todo: do we need default database?
+    public ElasticSearchCatalog(String catalogName, String defaultDatabase, 
Config elasticSearchConfig) {
+        this.catalogName = checkNotNull(catalogName, "catalogName cannot be 
null");
+        this.defaultDatabase = defaultDatabase;
+        this.pluginConfig = checkNotNull(elasticSearchConfig, 
"elasticSearchConfig cannot be null");
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        try {
+            esRestClient = EsRestClient.createInstance(pluginConfig);
+            ElasticsearchClusterInfo elasticsearchClusterInfo = 
esRestClient.getClusterInfo();
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Success open es catalog: {}, cluster info: {}", 
catalogName, elasticsearchClusterInfo);
+            }
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed to open catalog 
%s", catalogName), e);
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        esRestClient.close();
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return defaultDatabase;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        // check if the index exist
+        try {
+            List<IndexDocsCount> indexDocsCount = 
esRestClient.getIndexDocsCount(databaseName);
+            return true;
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed to check if catalog %s database %s 
exists", catalogName, databaseName), e);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return esRestClient.listIndex();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws 
CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        }
+        return Lists.newArrayList(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        checkNotNull(tablePath);
+        // todo: Check if the database name is the same with table name
+        return databaseExists(tablePath.getTableName());
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath) throws CatalogException, 
TableNotExistException {
+        // Get the index mapping?
+        return null;
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        // Create the index
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) 
throws TableNotExistException, CatalogException {
+        checkNotNull(tablePath);
+        if (!tableExists(tablePath) && !ignoreIfNotExists) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+        try {
+            esRestClient.dropIndex(tablePath.getTableName());
+        } catch (Exception ex) {
+            throw new CatalogException(
+                String.format("Failed to drop table %s in catalog %s", 
tablePath.getTableName(), catalogName), ex);
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists) 
throws DatabaseAlreadyExistException, CatalogException {
+        throw new UnsupportedOperationException("Elasticsearch does not 
support create database");
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) 
throws DatabaseNotExistException, CatalogException {
+        dropTable(tablePath, ignoreIfNotExists);
+    }
+
+    @Override
+    public DataTypeConvertor<?> getDataTypeConvertor() {
+        return ElasticSearchDataTypeConvertor.getInstance();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
new file mode 100644
index 000000000..55e3460f9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+
+public class ElasticSearchCatalogFactory implements CatalogFactory {
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        // todo:
+        return null;
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        // todo:
+        return "Elasticsearch";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        // todo:
+        return null;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
similarity index 67%
copy from 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
copy to 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
index 70e14c9f9..1d8f4e51e 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
@@ -15,38 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 
 import java.util.Map;
 
-/**
- * The data type convertor of Kafka, only fields defined in schema has the 
type.
- * e.g.
- * <pre>
- * schema = {
- *    fields {
- *      name = "string"
- *      age = "int"
- *    }
- * }
- * </pre>
- * <p> Right now the data type of kafka is SeaTunnelType, so we don't need to 
convert the data type.
- */
-public class KafkaDataTypeConvertor implements 
DataTypeConvertor<SeaTunnelDataType<?>> {
+public class ElasticSearchDataTypeConvertor implements 
DataTypeConvertor<SeaTunnelDataType<?>> {
+
+    private static final ElasticSearchDataTypeConvertor INSTANCE = new 
ElasticSearchDataTypeConvertor();
 
-    private static final KafkaDataTypeConvertor INSTANCE = new 
KafkaDataTypeConvertor();
+    private ElasticSearchDataTypeConvertor() {
 
-    private KafkaDataTypeConvertor() {
     }
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
-        // todo: Do we have utils to deserialize a string to SeaTunnelDataType?
-        return null;
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType);
+        return SeaTunnelSchema.parseTypeByString(connectorDataType);
     }
 
     @Override
@@ -59,7 +50,7 @@ public class KafkaDataTypeConvertor implements 
DataTypeConvertor<SeaTunnelDataTy
         return seaTunnelDataType;
     }
 
-    public static KafkaDataTypeConvertor getInstance() {
+    public static ElasticSearchDataTypeConvertor getInstance() {
         return INSTANCE;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 2a4ea77a1..b32585503 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -59,6 +59,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class EsRestClient {
@@ -345,6 +346,51 @@ public class EsRestClient {
         }
     }
 
+    public List<String> listIndex() {
+        String endpoint = "/_cat/indices?format=json";
+        Request request = new Request("GET", endpoint);
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED,
+                    "GET " + endpoint + " response null");
+            }
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                String entity = EntityUtils.toString(response.getEntity());
+                return JsonUtils.toList(entity, Map.class)
+                    .stream()
+                    .map(map -> map.get("index").toString())
+                    .collect(Collectors.toList());
+            } else {
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED,
+                    String.format("GET %s response status code=%d", endpoint, 
response.getStatusLine().getStatusCode()));
+            }
+        } catch (IOException ex) {
+            throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.LIST_INDEX_FAILED,
 ex);
+        }
+    }
+
+    public void dropIndex(String tableName) {
+        String endpoint = String.format("/%s", tableName);
+        Request request = new Request("DELETE", endpoint);
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED,
+                    "DELETE " + endpoint + " response null");
+            }
+            // todo: if the index doesn't exist, the response status code is 
200?
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                return;
+            } else {
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED,
+                    String.format("DELETE %s response status code=%d", 
endpoint, response.getStatusLine().getStatusCode()));
+            }
+        } catch (IOException ex) {
+            throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.DROP_INDEX_FAILED,
 ex);
+        }
+    }
+
     /**
      * get es field name and type mapping realtion
      *
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index 2fb5c11c9..739d7d8b2 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -24,7 +24,10 @@ public enum ElasticsearchConnectorErrorCode implements 
SeaTunnelErrorCode {
     BULK_RESPONSE_ERROR("ELASTICSEARCH-01", "Bulk es response error"),
     GET_ES_VERSION_FAILED("ELASTICSEARCH-02", "Get elasticsearch version 
failed"),
     SCROLL_REQUEST_ERROR("ELASTICSEARCH-03", "Fail to scroll request"),
-    GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch 
document index count failed");
+    GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch 
document index count failed"),
+    LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"),
+    DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"),
+    ;
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
index bc1686a3a..ea0e3f7d1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
@@ -53,21 +53,21 @@ public class MysqlDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
     public static final Integer DEFAULT_SCALE = 0;
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
-        checkNotNull(connectorType, "connectorType can not be null");
-        MysqlType mysqlType = MysqlType.getByName(connectorType);
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        MysqlType mysqlType = MysqlType.getByName(connectorDataType);
         Map<String, Object> dataTypeProperties;
         switch (mysqlType) {
             case BIGINT_UNSIGNED:
             case DECIMAL:
             case DECIMAL_UNSIGNED:
                 // parse precision and scale
-                int left = connectorType.indexOf("(");
-                int right = connectorType.indexOf(")");
+                int left = connectorDataType.indexOf("(");
+                int right = connectorDataType.indexOf(")");
                 int precision = DEFAULT_PRECISION;
                 int scale = DEFAULT_SCALE;
                 if (left != -1 && right != -1) {
-                    String[] precisionAndScale = connectorType.substring(left 
+ 1, right).split(",");
+                    String[] precisionAndScale = 
connectorDataType.substring(left + 1, right).split(",");
                     if (precisionAndScale.length == 2) {
                         precision = Integer.parseInt(precisionAndScale[0]);
                         scale = Integer.parseInt(precisionAndScale[1]);
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
index 70e14c9f9..b8e315a2d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 
 import java.util.Map;
 
@@ -44,9 +47,9 @@ public class KafkaDataTypeConvertor implements 
DataTypeConvertor<SeaTunnelDataTy
     }
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(String connectorType) {
-        // todo: Do we have utils to deserialize a string to SeaTunnelDataType?
-        return null;
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        return SeaTunnelSchema.parseTypeByString(connectorDataType);
     }
 
     @Override

Reply via email to