This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 72f241871 Support ES catalog get field mapping (#4167)
72f241871 is described below

commit 72f241871325c29b5031f05ad4cefe89f60a2a14
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Feb 20 15:17:38 2023 +0800

    Support ES catalog get field mapping (#4167)
---
 .../api/configuration/util/ConfigUtil.java         | 10 +++
 .../api/configuration/util/ConfigUtilTest.java     | 42 +++++++++++
 .../catalog/ElasticSearchCatalog.java              | 52 +++++++++++++-
 .../catalog/ElasticSearchDataTypeConvertor.java    | 81 +++++++++++++++++++---
 .../elasticsearch/client/EsRestClient.java         | 52 ++++++++++----
 .../exception/ElasticsearchConnectorErrorCode.java |  1 +
 .../elasticsearch/source/ElasticsearchSource.java  |  6 +-
 7 files changed, 219 insertions(+), 25 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
index 13352187a..40ddbf849 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
@@ -21,6 +21,8 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingExcep
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -171,4 +173,12 @@ public class ConfigUtil {
             throw new IllegalArgumentException(String.format("Could not parse 
json, value: %s", o));
         }
     }
+
+    public static String convertToJsonString(Config config) {
+        return convertToJsonString(config.root().unwrapped());
+    }
+
+    public static Config convertToConfig(String configJson) {
+        return ConfigFactory.parseString(configJson);
+    }
 }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
new file mode 100644
index 000000000..1d0f8bc05
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+
+public class ConfigUtilTest {
+
+    @Test
+    public void convertToJsonString() throws URISyntaxException {
+        Config config = ConfigFactory
+            
.parseFile(Paths.get(ConfigUtilTest.class.getResource("/conf/option-test.conf").toURI()).toFile())
+            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+            .resolveWith(ConfigFactory.systemProperties(), 
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        String configJson = ConfigUtil.convertToJsonString(config);
+        Config parsedConfig = ConfigUtil.convertToConfig(configJson);
+        Assertions.assertEquals(config.getConfig("env"), 
parsedConfig.getConfig("env"));
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 442cecc37..887808702 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -19,9 +19,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.seatunnel.api.configuration.util.ConfigUtil;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -37,7 +41,10 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Elasticsearch catalog implementation.
@@ -118,12 +125,45 @@ public class ElasticSearchCatalog implements Catalog {
     @Override
     public CatalogTable getTable(TablePath tablePath) throws CatalogException, 
TableNotExistException {
         // Get the index mapping?
-        return null;
+        checkNotNull(tablePath, "tablePath cannot be null");
+        ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor = new 
ElasticSearchDataTypeConvertor();
+        TableSchema.Builder builder = TableSchema.builder();
+        Map<String, String> fieldTypeMapping = 
esRestClient.getFieldTypeMapping(tablePath.getTableName(), 
Collections.emptyList());
+        fieldTypeMapping.forEach((fieldName, fieldType) -> {
+            // todo: we need to add a new type TEXT or add length in STRING 
type
+            PhysicalColumn physicalColumn = PhysicalColumn.of(
+                fieldName,
+                elasticSearchDataTypeConvertor.toSeaTunnelType(fieldType),
+                null,
+                true,
+                null,
+                null
+            );
+            builder.column(physicalColumn);
+        });
+
+
+        return CatalogTable.of(
+            TableIdentifier.of(catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName()),
+            builder.build(),
+            buildTableOptions(tablePath),
+            Collections.emptyList(),
+            ""
+        );
     }
 
     @Override
     public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
         // Create the index
+        checkNotNull(tablePath, "tablePath cannot be null");
+        if (tableExists(tablePath)) {
+            if (ignoreIfExists) {
+                return;
+            } else {
+                throw new TableAlreadyExistException(catalogName, tablePath, 
null);
+            }
+        }
+        esRestClient.createIndex(tablePath.getTableName());
     }
 
     @Override
@@ -142,11 +182,19 @@ public class ElasticSearchCatalog implements Catalog {
 
     @Override
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists) 
throws DatabaseAlreadyExistException, CatalogException {
-        throw new UnsupportedOperationException("Elasticsearch does not 
support create database");
+        createTable(tablePath, null, ignoreIfExists);
     }
 
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) 
throws DatabaseNotExistException, CatalogException {
         dropTable(tablePath, ignoreIfNotExists);
     }
+
+    private Map<String, String> buildTableOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "elasticsearch");
+        // todo: Right now, we don't use the config in the plugin config, do 
we need to add bootstrapt servers here?
+        options.put("config", ConfigUtil.convertToJsonString(tablePath));
+        return options;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
index c9551f433..21b7dc0a7 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
@@ -21,30 +21,95 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.api.table.type.SqlType;
 
 import com.google.auto.service.AutoService;
 
 import java.util.Map;
 
 @AutoService(DataTypeConvertor.class)
-public class ElasticSearchDataTypeConvertor implements 
DataTypeConvertor<SeaTunnelDataType<?>> {
+public class ElasticSearchDataTypeConvertor implements 
DataTypeConvertor<String> {
+
+    public static final String STRING = "string";
+    public static final String KEYWORD = "keyword";
+    public static final String TEXT = "text";
+    public static final String BOOLEAN = "boolean";
+    public static final String BYTE = "byte";
+    public static final String SHORT = "short";
+    public static final String INTEGER = "integer";
+    public static final String LONG = "long";
+    public static final String FLOAT = "float";
+    public static final String HALF_FLOAT = "half_float";
+    public static final String DOUBLE = "double";
+    public static final String DATE = "date";
 
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
-        checkNotNull(connectorDataType);
-        return SeaTunnelSchema.parseTypeByString(connectorDataType);
+        return toSeaTunnelType(connectorDataType, null);
     }
 
     @Override
-    public SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?> 
connectorDataType, Map<String, Object> dataTypeProperties) throws 
DataTypeConvertException {
-        return connectorDataType;
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType, 
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        switch (connectorDataType) {
+            case STRING:
+                return BasicType.STRING_TYPE;
+            case KEYWORD:
+                return BasicType.STRING_TYPE;
+            case TEXT:
+                return BasicType.STRING_TYPE;
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case BYTE:
+                return BasicType.BYTE_TYPE;
+            case SHORT:
+                return BasicType.SHORT_TYPE;
+            case INTEGER:
+                return BasicType.INT_TYPE;
+            case LONG:
+                return BasicType.LONG_TYPE;
+            case FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case HALF_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            default:
+                throw new DataTypeConvertException("unsupported 
connectorDataType: " + connectorDataType);
+        }
     }
 
     @Override
-    public SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?> 
seaTunnelDataType, Map<String, Object> dataTypeProperties) throws 
DataTypeConvertException {
-        return seaTunnelDataType;
+    public String toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, 
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+        checkNotNull(seaTunnelDataType, "seaTunnelDataType can not be null");
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        switch (sqlType) {
+            case STRING:
+                return STRING;
+            case BOOLEAN:
+                return BOOLEAN;
+            case BYTES:
+                return BYTE;
+            case TINYINT:
+                return SHORT;
+            case INT:
+                return INTEGER;
+            case BIGINT:
+                return LONG;
+            case FLOAT:
+                return FLOAT;
+            case DOUBLE:
+                return DOUBLE;
+            case TIMESTAMP:
+                return DATE;
+            default:
+                return STRING;
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index b32585503..151eb7178 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -34,6 +34,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpStatus;
 import org.apache.http.auth.AuthScope;
@@ -59,6 +60,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -370,6 +372,25 @@ public class EsRestClient {
         }
     }
 
+    // todo: We don't support set the index mapping now.
+    public void createIndex(String indexName) {
+        String endpoint = String.format("/%s", indexName);
+        Request request = new Request("PUT", endpoint);
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED,
+                    "PUT " + endpoint + " response null");
+            }
+            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED,
+                    String.format("PUT %s response status code=%d", endpoint, 
response.getStatusLine().getStatusCode()));
+            }
+        } catch (IOException ex) {
+            throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED,
 ex);
+        }
+    }
+
     public void dropIndex(String tableName) {
         String endpoint = String.format("/%s", tableName);
         Request request = new Request("DELETE", endpoint);
@@ -437,21 +458,26 @@ public class EsRestClient {
 
     private static Map<String, String> 
getFieldTypeMappingFromProperties(JsonNode properties, List<String> source) {
         Map<String, String> mapping = new HashMap<>();
-        for (String field : source) {
-            JsonNode fieldProperty = properties.get(field);
-            if (fieldProperty == null) {
-                mapping.put(field, "text");
-            } else {
-                if (fieldProperty.has("type")) {
-                    String type = fieldProperty.get("type").asText();
-                    mapping.put(field, type);
-                } else {
-                    log.warn(String.format("fail to get elasticsearch field %s 
mapping type,so give a default type text", field));
-                    mapping.put(field, "text");
-                }
+        Map<String, String> allElasticSearchFieldTypeInfoMap = new HashMap<>();
+        properties.fields().forEachRemaining(entry -> {
+            String fieldName = entry.getKey();
+            JsonNode fieldProperty = entry.getValue();
+            if (fieldProperty.has("type")) {
+                allElasticSearchFieldTypeInfoMap.put(fieldName, 
fieldProperty.get("type").asText());
             }
+        });
+        if (CollectionUtils.isEmpty(source)) {
+            return allElasticSearchFieldTypeInfoMap;
         }
-        return mapping;
+
+        return source.stream().collect(Collectors.toMap(Function.identity(), 
fieldName -> {
+            String fieldType = allElasticSearchFieldTypeInfoMap.get(fieldName);
+            if (fieldType == null) {
+                log.warn("fail to get elasticsearch field {} mapping type,so 
give a default type text", fieldName);
+                return "text";
+            }
+            return fieldType;
+        }));
     }
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index 739d7d8b2..b89fb82ae 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -27,6 +27,7 @@ public enum ElasticsearchConnectorErrorCode implements 
SeaTunnelErrorCode {
     GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch 
document index count failed"),
     LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"),
     DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"),
+    CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index 
failed"),
     ;
 
     private final String code;
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index 98299c5bb..0303dd7b8 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -28,9 +28,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchDataTypeConvertor;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.EsTypeMappingSeaTunnelType;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -59,6 +59,7 @@ public class ElasticsearchSource implements 
SeaTunnelSource<SeaTunnelRow, Elasti
     public void prepare(Config pluginConfig) throws PrepareFailException {
         this.pluginConfig = pluginConfig;
         if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+            // todo: We need to remove the schema in ES.
             Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
             rowTypeInfo = 
SeaTunnelSchema.buildWithConfig(schemaConfig).getSeaTunnelRowType();
             source = Arrays.asList(rowTypeInfo.getFieldNames());
@@ -68,9 +69,10 @@ public class ElasticsearchSource implements 
SeaTunnelSource<SeaTunnelRow, Elasti
             Map<String, String> esFieldType = 
esRestClient.getFieldTypeMapping(pluginConfig.getString(SourceConfig.INDEX.key()),
 source);
             esRestClient.close();
             SeaTunnelDataType[] fieldTypes = new 
SeaTunnelDataType[source.size()];
+            ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor = 
new ElasticSearchDataTypeConvertor();
             for (int i = 0; i < source.size(); i++) {
                 String esType = esFieldType.get(source.get(i));
-                SeaTunnelDataType seaTunnelDataType = 
EsTypeMappingSeaTunnelType.getSeaTunnelDataType(esType);
+                SeaTunnelDataType seaTunnelDataType = 
elasticSearchDataTypeConvertor.toSeaTunnelType(esType);
                 fieldTypes[i] = seaTunnelDataType;
             }
             rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[0]), 
fieldTypes);

Reply via email to