This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 72f241871 Support ES catalog get field mapping (#4167)
72f241871 is described below
commit 72f241871325c29b5031f05ad4cefe89f60a2a14
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Feb 20 15:17:38 2023 +0800
Support ES catalog get field mapping (#4167)
---
.../api/configuration/util/ConfigUtil.java | 10 +++
.../api/configuration/util/ConfigUtilTest.java | 42 +++++++++++
.../catalog/ElasticSearchCatalog.java | 52 +++++++++++++-
.../catalog/ElasticSearchDataTypeConvertor.java | 81 +++++++++++++++++++---
.../elasticsearch/client/EsRestClient.java | 52 ++++++++++----
.../exception/ElasticsearchConnectorErrorCode.java | 1 +
.../elasticsearch/source/ElasticsearchSource.java | 6 +-
7 files changed, 219 insertions(+), 25 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
index 13352187a..40ddbf849 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
@@ -21,6 +21,8 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingExcep
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -171,4 +173,12 @@ public class ConfigUtil {
throw new IllegalArgumentException(String.format("Could not parse
json, value: %s", o));
}
}
+
+ public static String convertToJsonString(Config config) {
+ return convertToJsonString(config.root().unwrapped());
+ }
+
+ public static Config convertToConfig(String configJson) {
+ return ConfigFactory.parseString(configJson);
+ }
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
new file mode 100644
index 000000000..1d0f8bc05
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+
+public class ConfigUtilTest {
+
+ @Test
+ public void convertToJsonString() throws URISyntaxException {
+ Config config = ConfigFactory
+
.parseFile(Paths.get(ConfigUtilTest.class.getResource("/conf/option-test.conf").toURI()).toFile())
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ String configJson = ConfigUtil.convertToJsonString(config);
+ Config parsedConfig = ConfigUtil.convertToConfig(configJson);
+ Assertions.assertEquals(config.getConfig("env"),
parsedConfig.getConfig("env"));
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 442cecc37..887808702 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -19,9 +19,13 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.seatunnel.api.configuration.util.ConfigUtil;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
@@ -37,7 +41,10 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Elasticsearch catalog implementation.
@@ -118,12 +125,45 @@ public class ElasticSearchCatalog implements Catalog {
@Override
public CatalogTable getTable(TablePath tablePath) throws CatalogException,
TableNotExistException {
// Get the index mapping?
- return null;
+ checkNotNull(tablePath, "tablePath cannot be null");
+ ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor = new
ElasticSearchDataTypeConvertor();
+ TableSchema.Builder builder = TableSchema.builder();
+ Map<String, String> fieldTypeMapping =
esRestClient.getFieldTypeMapping(tablePath.getTableName(),
Collections.emptyList());
+ fieldTypeMapping.forEach((fieldName, fieldType) -> {
+ // todo: we need to add a new type TEXT or add length in STRING
type
+ PhysicalColumn physicalColumn = PhysicalColumn.of(
+ fieldName,
+ elasticSearchDataTypeConvertor.toSeaTunnelType(fieldType),
+ null,
+ true,
+ null,
+ null
+ );
+ builder.column(physicalColumn);
+ });
+
+
+ return CatalogTable.of(
+ TableIdentifier.of(catalogName, tablePath.getDatabaseName(),
tablePath.getTableName()),
+ builder.build(),
+ buildTableOptions(tablePath),
+ Collections.emptyList(),
+ ""
+ );
}
@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
// Create the index
+ checkNotNull(tablePath, "tablePath cannot be null");
+ if (tableExists(tablePath)) {
+ if (ignoreIfExists) {
+ return;
+ } else {
+ throw new TableAlreadyExistException(catalogName, tablePath,
null);
+ }
+ }
+ esRestClient.createIndex(tablePath.getTableName());
}
@Override
@@ -142,11 +182,19 @@ public class ElasticSearchCatalog implements Catalog {
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
- throw new UnsupportedOperationException("Elasticsearch does not
support create database");
+ createTable(tablePath, null, ignoreIfExists);
}
@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
dropTable(tablePath, ignoreIfNotExists);
}
+
+ private Map<String, String> buildTableOptions(TablePath tablePath) {
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", "elasticsearch");
+ // todo: Right now, we don't use the config in the plugin config, do
we need to add bootstrapt servers here?
+ options.put("config", ConfigUtil.convertToJsonString(tablePath));
+ return options;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
index c9551f433..21b7dc0a7 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
@@ -21,30 +21,95 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.api.table.type.SqlType;
import com.google.auto.service.AutoService;
import java.util.Map;
@AutoService(DataTypeConvertor.class)
-public class ElasticSearchDataTypeConvertor implements
DataTypeConvertor<SeaTunnelDataType<?>> {
+public class ElasticSearchDataTypeConvertor implements
DataTypeConvertor<String> {
+
+ public static final String STRING = "string";
+ public static final String KEYWORD = "keyword";
+ public static final String TEXT = "text";
+ public static final String BOOLEAN = "boolean";
+ public static final String BYTE = "byte";
+ public static final String SHORT = "short";
+ public static final String INTEGER = "integer";
+ public static final String LONG = "long";
+ public static final String FLOAT = "float";
+ public static final String HALF_FLOAT = "half_float";
+ public static final String DOUBLE = "double";
+ public static final String DATE = "date";
@Override
public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
- checkNotNull(connectorDataType);
- return SeaTunnelSchema.parseTypeByString(connectorDataType);
+ return toSeaTunnelType(connectorDataType, null);
}
@Override
- public SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?>
connectorDataType, Map<String, Object> dataTypeProperties) throws
DataTypeConvertException {
- return connectorDataType;
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType,
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ switch (connectorDataType) {
+ case STRING:
+ return BasicType.STRING_TYPE;
+ case KEYWORD:
+ return BasicType.STRING_TYPE;
+ case TEXT:
+ return BasicType.STRING_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case BYTE:
+ return BasicType.BYTE_TYPE;
+ case SHORT:
+ return BasicType.SHORT_TYPE;
+ case INTEGER:
+ return BasicType.INT_TYPE;
+ case LONG:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case HALF_FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ default:
+ throw new DataTypeConvertException("unsupported
connectorDataType: " + connectorDataType);
+ }
}
@Override
- public SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?>
seaTunnelDataType, Map<String, Object> dataTypeProperties) throws
DataTypeConvertException {
- return seaTunnelDataType;
+ public String toConnectorType(SeaTunnelDataType<?> seaTunnelDataType,
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+ checkNotNull(seaTunnelDataType, "seaTunnelDataType can not be null");
+ SqlType sqlType = seaTunnelDataType.getSqlType();
+ switch (sqlType) {
+ case STRING:
+ return STRING;
+ case BOOLEAN:
+ return BOOLEAN;
+ case BYTES:
+ return BYTE;
+ case TINYINT:
+ return SHORT;
+ case INT:
+ return INTEGER;
+ case BIGINT:
+ return LONG;
+ case FLOAT:
+ return FLOAT;
+ case DOUBLE:
+ return DOUBLE;
+ case TIMESTAMP:
+ return DATE;
+ default:
+ return STRING;
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index b32585503..151eb7178 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
@@ -59,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@@ -370,6 +372,25 @@ public class EsRestClient {
}
}
+ // todo: We don't support set the index mapping now.
+ public void createIndex(String indexName) {
+ String endpoint = String.format("/%s", indexName);
+ Request request = new Request("PUT", endpoint);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED,
+ "PUT " + endpoint + " response null");
+ }
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED,
+ String.format("PUT %s response status code=%d", endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.CREATE_INDEX_FAILED,
ex);
+ }
+ }
+
public void dropIndex(String tableName) {
String endpoint = String.format("/%s", tableName);
Request request = new Request("DELETE", endpoint);
@@ -437,21 +458,26 @@ public class EsRestClient {
private static Map<String, String>
getFieldTypeMappingFromProperties(JsonNode properties, List<String> source) {
Map<String, String> mapping = new HashMap<>();
- for (String field : source) {
- JsonNode fieldProperty = properties.get(field);
- if (fieldProperty == null) {
- mapping.put(field, "text");
- } else {
- if (fieldProperty.has("type")) {
- String type = fieldProperty.get("type").asText();
- mapping.put(field, type);
- } else {
- log.warn(String.format("fail to get elasticsearch field %s
mapping type,so give a default type text", field));
- mapping.put(field, "text");
- }
+ Map<String, String> allElasticSearchFieldTypeInfoMap = new HashMap<>();
+ properties.fields().forEachRemaining(entry -> {
+ String fieldName = entry.getKey();
+ JsonNode fieldProperty = entry.getValue();
+ if (fieldProperty.has("type")) {
+ allElasticSearchFieldTypeInfoMap.put(fieldName,
fieldProperty.get("type").asText());
}
+ });
+ if (CollectionUtils.isEmpty(source)) {
+ return allElasticSearchFieldTypeInfoMap;
}
- return mapping;
+
+ return source.stream().collect(Collectors.toMap(Function.identity(),
fieldName -> {
+ String fieldType = allElasticSearchFieldTypeInfoMap.get(fieldName);
+ if (fieldType == null) {
+ log.warn("fail to get elasticsearch field {} mapping type,so
give a default type text", fieldName);
+ return "text";
+ }
+ return fieldType;
+ }));
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index 739d7d8b2..b89fb82ae 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -27,6 +27,7 @@ public enum ElasticsearchConnectorErrorCode implements
SeaTunnelErrorCode {
GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch
document index count failed"),
LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"),
DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"),
+ CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index
failed"),
;
private final String code;
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index 98299c5bb..0303dd7b8 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -28,9 +28,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchDataTypeConvertor;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.EsTypeMappingSeaTunnelType;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -59,6 +59,7 @@ public class ElasticsearchSource implements
SeaTunnelSource<SeaTunnelRow, Elasti
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+ // todo: We need to remove the schema in ES.
Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
rowTypeInfo =
SeaTunnelSchema.buildWithConfig(schemaConfig).getSeaTunnelRowType();
source = Arrays.asList(rowTypeInfo.getFieldNames());
@@ -68,9 +69,10 @@ public class ElasticsearchSource implements
SeaTunnelSource<SeaTunnelRow, Elasti
Map<String, String> esFieldType =
esRestClient.getFieldTypeMapping(pluginConfig.getString(SourceConfig.INDEX.key()),
source);
esRestClient.close();
SeaTunnelDataType[] fieldTypes = new
SeaTunnelDataType[source.size()];
+ ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor =
new ElasticSearchDataTypeConvertor();
for (int i = 0; i < source.size(); i++) {
String esType = esFieldType.get(source.get(i));
- SeaTunnelDataType seaTunnelDataType =
EsTypeMappingSeaTunnelType.getSeaTunnelDataType(esType);
+ SeaTunnelDataType seaTunnelDataType =
elasticSearchDataTypeConvertor.toSeaTunnelType(esType);
fieldTypes[i] = seaTunnelDataType;
}
rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[0]),
fieldTypes);