This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 505c1252bd [Improve][Connector-V2] Add ElasticSearch type converter
(#6546)
505c1252bd is described below
commit 505c1252bd35ef4ec07ec31cdecb92b660000a00
Author: Jia Fan <[email protected]>
AuthorDate: Fri Mar 29 10:44:34 2024 +0800
[Improve][Connector-V2] Add ElasticSearch type converter (#6546)
---
.../catalog/ElasticSearchCatalog.java | 11 +-
.../catalog/ElasticSearchDataTypeConvertor.java | 89 ++---
.../catalog/ElasticSearchTypeConverter.java | 364 +++++++++++++++++++++
.../elasticsearch/client/EsRestClient.java | 134 ++++++--
.../seatunnel/elasticsearch/client/EsType.java | 76 +++++
.../seatunnel/elasticsearch/dto/IndexInfo.java | 4 +-
.../serialize/ElasticsearchRowSerializer.java | 37 ++-
.../source/DefaultSeaTunnelRowDeserializer.java | 24 +-
.../elasticsearch/sink/ElasticsearchSink.java | 4 +-
.../sink/ElasticsearchSinkFactory.java | 17 +-
.../sink/ElasticsearchSinkWriter.java | 10 +-
.../elasticsearch/source/ElasticsearchSource.java | 12 +-
.../serialize/ElasticsearchRowSerializerTest.java | 8 +-
.../connector/elasticsearch/ElasticsearchIT.java | 47 ++-
.../elasticsearch_source_and_sink_full_type.conf | 97 ++++++
.../elasticsearch/st_index_full_type_data.json | 137 ++++++++
.../elasticsearch/st_index_full_type_mapping.json | 162 +++++++++
17 files changed, 1104 insertions(+), 129 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 066a69c2dc..b1eb60e289 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -32,7 +32,9 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistExce
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
@@ -146,10 +148,8 @@ public class ElasticSearchCatalog implements Catalog {
throws CatalogException, TableNotExistException {
// Get the index mapping?
checkNotNull(tablePath, "tablePath cannot be null");
- ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor =
- new ElasticSearchDataTypeConvertor();
TableSchema.Builder builder = TableSchema.builder();
- Map<String, String> fieldTypeMapping =
+ Map<String, BasicTypeDefine<EsType>> fieldTypeMapping =
esRestClient.getFieldTypeMapping(tablePath.getTableName(),
Collections.emptyList());
buildColumnsWithErrorCheck(
tablePath,
@@ -159,8 +159,9 @@ public class ElasticSearchCatalog implements Catalog {
// todo: we need to add a new type TEXT or add length in
STRING type
return PhysicalColumn.of(
nameAndType.getKey(),
- elasticSearchDataTypeConvertor.toSeaTunnelType(
- nameAndType.getKey(),
nameAndType.getValue()),
+ ElasticSearchTypeConverter.INSTANCE
+ .convert(nameAndType.getValue())
+ .getDataType(),
(Long) null,
true,
null,
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
index 0e081f83ef..7aecdfb9ea 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
@@ -17,11 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import com.google.auto.service.AutoService;
@@ -29,22 +30,11 @@ import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
+/** @deprecated instead by {@link ElasticSearchTypeConverter} */
+@Deprecated
@AutoService(DataTypeConvertor.class)
public class ElasticSearchDataTypeConvertor implements
DataTypeConvertor<String> {
- public static final String STRING = "string";
- public static final String KEYWORD = "keyword";
- public static final String TEXT = "text";
- public static final String BOOLEAN = "boolean";
- public static final String BYTE = "byte";
- public static final String SHORT = "short";
- public static final String INTEGER = "integer";
- public static final String LONG = "long";
- public static final String FLOAT = "float";
- public static final String HALF_FLOAT = "half_float";
- public static final String DOUBLE = "double";
- public static final String DATE = "date";
-
@Override
public SeaTunnelDataType<?> toSeaTunnelType(String field, String
connectorDataType) {
return toSeaTunnelType(field, connectorDataType, null);
@@ -54,34 +44,14 @@ public class ElasticSearchDataTypeConvertor implements
DataTypeConvertor<String>
public SeaTunnelDataType<?> toSeaTunnelType(
String field, String connectorDataType, Map<String, Object>
dataTypeProperties) {
checkNotNull(connectorDataType, "connectorDataType can not be null");
- switch (connectorDataType) {
- case STRING:
- return BasicType.STRING_TYPE;
- case KEYWORD:
- return BasicType.STRING_TYPE;
- case TEXT:
- return BasicType.STRING_TYPE;
- case BOOLEAN:
- return BasicType.BOOLEAN_TYPE;
- case BYTE:
- return BasicType.BYTE_TYPE;
- case SHORT:
- return BasicType.SHORT_TYPE;
- case INTEGER:
- return BasicType.INT_TYPE;
- case LONG:
- return BasicType.LONG_TYPE;
- case FLOAT:
- return BasicType.FLOAT_TYPE;
- case HALF_FLOAT:
- return BasicType.FLOAT_TYPE;
- case DOUBLE:
- return BasicType.DOUBLE_TYPE;
- case DATE:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- default:
- return BasicType.STRING_TYPE;
- }
+ BasicTypeDefine<EsType> typeDefine =
+ BasicTypeDefine.<EsType>builder()
+ .name(field)
+ .columnType(connectorDataType)
+ .dataType(connectorDataType)
+ .build();
+
+ return
ElasticSearchTypeConverter.INSTANCE.convert(typeDefine).getDataType();
}
@Override
@@ -90,29 +60,14 @@ public class ElasticSearchDataTypeConvertor implements
DataTypeConvertor<String>
SeaTunnelDataType<?> seaTunnelDataType,
Map<String, Object> dataTypeProperties) {
checkNotNull(seaTunnelDataType, "seaTunnelDataType can not be null");
- SqlType sqlType = seaTunnelDataType.getSqlType();
- switch (sqlType) {
- case STRING:
- return STRING;
- case BOOLEAN:
- return BOOLEAN;
- case BYTES:
- return BYTE;
- case TINYINT:
- return SHORT;
- case INT:
- return INTEGER;
- case BIGINT:
- return LONG;
- case FLOAT:
- return FLOAT;
- case DOUBLE:
- return DOUBLE;
- case TIMESTAMP:
- return DATE;
- default:
- return STRING;
- }
+ Column column =
+ PhysicalColumn.builder()
+ .name(field)
+ .dataType(seaTunnelDataType)
+ .nullable(true)
+ .build();
+ BasicTypeDefine<EsType> typeDefine =
ElasticSearchTypeConverter.INSTANCE.reconvert(column);
+ return typeDefine.getColumnType();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
new file mode 100644
index 0000000000..c7e21d1385
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeConverter;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.AGGREGATE_METRIC_DOUBLE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.BINARY;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.BOOLEAN;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.BYTE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.COMPLETION;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_NANOS;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_RANGE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DENSE_VECTOR;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DOUBLE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DOUBLE_RANGE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.FLATTENED;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.FLOAT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.FLOAT_RANGE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.GEO_POINT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.GEO_SHAPE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.HALF_FLOAT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.HISTOGRAM;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.INTEGER;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.INTEGER_RANGE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.IP;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.IP_RANGE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.JOIN;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.KEYWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.RANK_FEATURE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.RANK_FEATURES;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.SEARCH_AS_YOU_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.SHAPE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.SHORT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.SPARSE_VECTOR;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.STRING;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.TEXT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.TOKEN_COUNT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.UNSIGNED_LONG;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.VERSION;
+
+@AutoService(TypeConverter.class)
+public class ElasticSearchTypeConverter implements
BasicTypeConverter<BasicTypeDefine<EsType>> {
+ public static final ElasticSearchTypeConverter INSTANCE = new
ElasticSearchTypeConverter();
+
+ @Override
+ public String identifier() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ public Column convert(BasicTypeDefine<EsType> typeDefine) {
+ PhysicalColumn.PhysicalColumnBuilder builder =
+ PhysicalColumn.builder()
+ .name(typeDefine.getName())
+ .sourceType(typeDefine.getColumnType())
+ .nullable(typeDefine.isNullable())
+ .defaultValue(typeDefine.getDefaultValue())
+ .comment(typeDefine.getComment());
+ String type = typeDefine.getDataType().toLowerCase();
+ switch (type) {
+ case AGGREGATE_METRIC_DOUBLE:
+ List<String> metrics =
+ (List<String>)
typeDefine.getNativeType().getOptions().get("metrics");
+ builder.dataType(
+ new SeaTunnelRowType(
+ metrics.toArray(new String[0]),
+ metrics.stream()
+ .map(s -> BasicType.DOUBLE_TYPE)
+
.toArray(SeaTunnelDataType<?>[]::new)));
+ break;
+ case DENSE_VECTOR:
+ String elementType =
+
typeDefine.getNativeType().getOptions().get("element_type").toString();
+ if (elementType.equals("byte")) {
+ builder.dataType(ArrayType.BYTE_ARRAY_TYPE);
+ } else {
+ builder.dataType(ArrayType.FLOAT_ARRAY_TYPE);
+ }
+ break;
+ case BYTE:
+ builder.dataType(BasicType.BYTE_TYPE);
+ break;
+ case BOOLEAN:
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ break;
+ case DATE:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+ builder.scale(3);
+ break;
+ case DATE_NANOS:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+ builder.scale(9);
+ break;
+ case DOUBLE:
+ case RANK_FEATURE:
+ builder.dataType(BasicType.DOUBLE_TYPE);
+ break;
+ case FLOAT:
+ case HALF_FLOAT:
+ builder.dataType(BasicType.FLOAT_TYPE);
+ break;
+ case HISTOGRAM:
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"values", "counts"},
+ new SeaTunnelDataType<?>[] {
+ ArrayType.DOUBLE_ARRAY_TYPE,
ArrayType.LONG_ARRAY_TYPE
+ });
+ builder.dataType(rowType);
+ break;
+ case INTEGER:
+ case TOKEN_COUNT:
+ builder.dataType(BasicType.INT_TYPE);
+ break;
+ case LONG:
+ builder.dataType(BasicType.LONG_TYPE);
+ break;
+ case SHORT:
+ builder.dataType(BasicType.SHORT_TYPE);
+ break;
+ case OBJECT:
+ Map<String, BasicTypeDefine<EsType>> typeInfo =
+ (Map) typeDefine.getNativeType().getOptions();
+ SeaTunnelRowType object =
+ new SeaTunnelRowType(
+ typeInfo.keySet().toArray(new String[0]),
+ typeInfo.values().stream()
+ .map(this::convert)
+ .map(Column::getDataType)
+ .toArray(SeaTunnelDataType<?>[]::new));
+ builder.dataType(object);
+ break;
+ case INTEGER_RANGE:
+ builder.dataType(new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE));
+ break;
+ case FLOAT_RANGE:
+ builder.dataType(new MapType<>(BasicType.STRING_TYPE,
BasicType.FLOAT_TYPE));
+ break;
+ case LONG_RANGE:
+ builder.dataType(new MapType<>(BasicType.STRING_TYPE,
BasicType.LONG_TYPE));
+ break;
+ case DOUBLE_RANGE:
+ builder.dataType(new MapType<>(BasicType.STRING_TYPE,
BasicType.DOUBLE_TYPE));
+ break;
+ case DATE_RANGE:
+ builder.dataType(
+ new MapType<>(BasicType.STRING_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE));
+ break;
+ case IP_RANGE:
+ builder.dataType(new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE));
+ break;
+ case UNSIGNED_LONG:
+ builder.dataType(new DecimalType(20, 0));
+ builder.columnLength(20L);
+ builder.scale(0);
+ break;
+ case TEXT:
+ case BINARY:
+ case VERSION:
+ case IP:
+ case JOIN:
+ case KEYWORD:
+ case FLATTENED:
+ case GEO_POINT:
+ case COMPLETION:
+ case STRING:
+ case GEO_SHAPE:
+ case NESTED:
+ case PERCOLATOR:
+ case POINT:
+ case RANK_FEATURES:
+ case SEARCH_AS_YOU_TYPE:
+ case SPARSE_VECTOR:
+ case MATCH_ONLY_TEXT:
+ case SHAPE:
+ default:
+ builder.dataType(BasicType.STRING_TYPE);
+ break;
+ }
+ return builder.build();
+ }
+
+ @Override
+ public BasicTypeDefine<EsType> reconvert(Column column) {
+ BasicTypeDefine.BasicTypeDefineBuilder<EsType> builder =
+ BasicTypeDefine.<EsType>builder()
+ .name(column.getName())
+ .nullable(column.isNullable())
+ .comment(column.getComment())
+ .defaultValue(column.getDefaultValue());
+ switch (column.getDataType().getSqlType()) {
+ case BOOLEAN:
+ builder.columnType(BOOLEAN);
+ builder.dataType(BOOLEAN);
+ builder.nativeType(new EsType(BOOLEAN, new HashMap<>()));
+ break;
+ case BYTES:
+ builder.columnType(BINARY);
+ builder.dataType(BINARY);
+ builder.nativeType(new EsType(BINARY, new HashMap<>()));
+ break;
+ case TINYINT:
+ builder.columnType(BYTE);
+ builder.dataType(BYTE);
+ builder.nativeType(new EsType(BYTE, new HashMap<>()));
+ break;
+ case SMALLINT:
+ builder.columnType(SHORT);
+ builder.dataType(SHORT);
+ builder.nativeType(new EsType(SHORT, new HashMap<>()));
+ break;
+ case INT:
+ builder.columnType(INTEGER);
+ builder.dataType(INTEGER);
+ builder.nativeType(new EsType(INTEGER, new HashMap<>()));
+ break;
+ case BIGINT:
+ builder.columnType(LONG);
+ builder.dataType(LONG);
+ builder.nativeType(new EsType(LONG, new HashMap<>()));
+ break;
+ case FLOAT:
+ builder.columnType(FLOAT);
+ builder.dataType(FLOAT);
+ builder.nativeType(new EsType(FLOAT, new HashMap<>()));
+ break;
+ case DOUBLE:
+ builder.columnType(DOUBLE);
+ builder.dataType(DOUBLE);
+ builder.nativeType(new EsType(DOUBLE, new HashMap<>()));
+ break;
+ case DATE:
+ case TIMESTAMP:
+ Map<String, Object> option = new HashMap<>();
+ if (column.getScale() != null && column.getScale() > 3) {
+ option.put("format",
"strict_date_optional_time||epoch_millis");
+ builder.columnType(DATE_NANOS);
+ builder.dataType(DATE_NANOS);
+ builder.nativeType(new EsType(DATE_NANOS, option));
+ } else {
+ option.put("format",
"strict_date_optional_time_nanos||epoch_millis");
+ builder.columnType(DATE);
+ builder.dataType(DATE);
+ builder.nativeType(new EsType(DATE, option));
+ }
+ break;
+ case DECIMAL:
+ builder.columnType(TEXT);
+ builder.dataType(TEXT);
+ builder.nativeType(new EsType(TEXT, new HashMap<>()));
+ break;
+ case MAP:
+ builder.columnType(FLATTENED);
+ builder.dataType(FLATTENED);
+ builder.nativeType(new EsType(FLATTENED, new HashMap<>()));
+ break;
+ case ARRAY:
+ BasicType type = ((ArrayType)
column.getDataType()).getElementType();
+ if (type.equals(BasicType.BYTE_TYPE)) {
+ builder.columnType(BINARY);
+ builder.dataType(BINARY);
+ builder.nativeType(new EsType(BINARY, new HashMap<>()));
+ } else if (type.equals(BasicType.SHORT_TYPE)) {
+ builder.columnType(SHORT);
+ builder.dataType(SHORT);
+ builder.nativeType(new EsType(SHORT, new HashMap<>()));
+ } else if (type.equals(BasicType.INT_TYPE)) {
+ builder.columnType(INTEGER);
+ builder.dataType(INTEGER);
+ builder.nativeType(new EsType(INTEGER, new HashMap<>()));
+ } else if (type.equals(BasicType.LONG_TYPE)) {
+ builder.columnType(LONG);
+ builder.dataType(LONG);
+ builder.nativeType(new EsType(LONG, new HashMap<>()));
+ } else if (type.equals(BasicType.FLOAT_TYPE)) {
+ builder.columnType(FLOAT);
+ builder.dataType(FLOAT);
+ builder.nativeType(new EsType(FLOAT, new HashMap<>()));
+ } else if (type.equals(BasicType.DOUBLE_TYPE)) {
+ builder.columnType(DOUBLE);
+ builder.dataType(DOUBLE);
+ builder.nativeType(new EsType(DOUBLE, new HashMap<>()));
+ } else if (type.equals(BasicType.STRING_TYPE)) {
+ builder.columnType(TEXT);
+ builder.dataType(TEXT);
+ builder.nativeType(new EsType(TEXT, new HashMap<>()));
+ } else {
+ builder.columnType(TEXT);
+ builder.dataType(TEXT);
+ builder.nativeType(new EsType(TEXT, new HashMap<>()));
+ }
+ break;
+ case ROW:
+ builder.columnType(OBJECT);
+ builder.dataType(OBJECT);
+ SeaTunnelRowType row = (SeaTunnelRowType) column.getDataType();
+ Map<String, BasicTypeDefine<EsType>> typeInfo = new
HashMap<>();
+ for (int i = 0; i < row.getFieldNames().length; i++) {
+ typeInfo.put(
+ row.getFieldName(i),
+ reconvert(
+ PhysicalColumn.of(
+ row.getFieldName(i),
+ row.getFieldType(i),
+ (Long) null,
+ true,
+ null,
+ null)));
+ }
+ builder.nativeType(new EsType(OBJECT, (Map) typeInfo));
+ break;
+ case TIME:
+ case NULL:
+ case STRING:
+ default:
+ builder.columnType(TEXT);
+ builder.dataType(TEXT);
+ builder.nativeType(new EsType(TEXT, new HashMap<>()));
+ }
+ return builder.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 50c47d1334..18c9b7c109 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -19,10 +19,12 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
@@ -34,6 +36,7 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.Elastic
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.SSLUtils;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
@@ -65,6 +68,13 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.AGGREGATE_METRIC_DOUBLE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.ALIAS;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_NANOS;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DENSE_VECTOR;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
+
@Slf4j
public class EsRestClient {
@@ -174,7 +184,7 @@ public class EsRestClient {
keystorePassword,
truststorePath,
truststorePassword);
- sslContext.ifPresent(e ->
httpClientBuilder.setSSLContext(e));
+
sslContext.ifPresent(httpClientBuilder::setSSLContext);
} else {
SSLContext sslContext =
SSLContexts.custom()
@@ -237,7 +247,7 @@ public class EsRestClient {
.clusterVersion(versionNode.get("number").asText())
.distribution(
Optional.ofNullable(versionNode.get("distribution"))
- .map(e -> e.asText())
+ .map(JsonNode::asText)
.orElse(null))
.build();
} catch (IOException e) {
@@ -276,9 +286,7 @@ public class EsRestClient {
param.put("sort", new String[] {"_doc"});
param.put("size", scrollSize);
String endpoint = "/" + index + "/_search?scroll=" + scrollTime;
- ScrollResult scrollResult =
- getDocsFromScrollRequest(endpoint,
JsonUtils.toJsonString(param));
- return scrollResult;
+ return getDocsFromScrollRequest(endpoint,
JsonUtils.toJsonString(param));
}
/**
@@ -291,9 +299,7 @@ public class EsRestClient {
Map<String, String> param = new HashMap<>();
param.put("scroll_id", scrollId);
param.put("scroll", scrollTime);
- ScrollResult scrollResult =
- getDocsFromScrollRequest("/_search/scroll",
JsonUtils.toJsonString(param));
- return scrollResult;
+ return getDocsFromScrollRequest("/_search/scroll",
JsonUtils.toJsonString(param));
}
private ScrollResult getDocsFromScrollRequest(String endpoint, String
requestBody) {
@@ -319,8 +325,7 @@ public class EsRestClient {
"POST %s,total shards(%d)!= successful
shards(%d)",
endpoint, totalShards, successful));
- ScrollResult scrollResult =
getDocsFromScrollResponse(responseJson);
- return scrollResult;
+ return getDocsFromScrollResponse(responseJson);
} else {
throw new ElasticsearchConnectorException(
ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
@@ -345,13 +350,11 @@ public class EsRestClient {
List<Map<String, Object>> docs = new ArrayList<>(hitsNode.size());
scrollResult.setDocs(docs);
- Iterator<JsonNode> iter = hitsNode.iterator();
- while (iter.hasNext()) {
+ for (JsonNode jsonNode : hitsNode) {
Map<String, Object> doc = new HashMap<>();
- JsonNode hitNode = iter.next();
- doc.put("_index", hitNode.get("_index").textValue());
- doc.put("_id", hitNode.get("_id").textValue());
- JsonNode source = hitNode.get("_source");
+ doc.put("_index", jsonNode.get("_index").textValue());
+ doc.put("_id", jsonNode.get("_id").textValue());
+ JsonNode source = jsonNode.get("_source");
for (Iterator<Map.Entry<String, JsonNode>> iterator =
source.fields();
iterator.hasNext(); ) {
Map.Entry<String, JsonNode> entry = iterator.next();
@@ -379,9 +382,7 @@ public class EsRestClient {
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
- List<IndexDocsCount> indexDocsCounts =
- JsonUtils.toList(entity, IndexDocsCount.class);
- return indexDocsCounts;
+ return JsonUtils.toList(entity, IndexDocsCount.class);
} else {
throw new ElasticsearchConnectorException(
ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
@@ -423,10 +424,16 @@ public class EsRestClient {
}
}
- // todo: We don't support set the index mapping now.
public void createIndex(String indexName) {
+ createIndex(indexName, null);
+ }
+
+ public void createIndex(String indexName, String mapping) {
String endpoint = String.format("/%s", indexName);
Request request = new Request("PUT", endpoint);
+ if (StringUtils.isNotEmpty(mapping)) {
+ request.setJsonEntity(mapping);
+ }
try {
Response response = restClient.performRequest(request);
if (response == null) {
@@ -479,10 +486,11 @@ public class EsRestClient {
* @param index index name
* @return {key-> field name,value->es type}
*/
- public Map<String, String> getFieldTypeMapping(String index, List<String>
source) {
+ public Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping(
+ String index, List<String> source) {
String endpoint = String.format("/%s/_mappings", index);
Request request = new Request("GET", endpoint);
- Map<String, String> mapping = new HashMap<>();
+ Map<String, BasicTypeDefine<EsType>> mapping = new HashMap<>();
try {
Response response = restClient.performRequest(request);
if (response == null) {
@@ -525,9 +533,9 @@ public class EsRestClient {
return mapping;
}
- private static Map<String, String> getFieldTypeMappingFromProperties(
+ private static Map<String, BasicTypeDefine<EsType>>
getFieldTypeMappingFromProperties(
JsonNode properties, List<String> source) {
- Map<String, String> allElasticSearchFieldTypeInfoMap = new HashMap<>();
+ Map<String, BasicTypeDefine<EsType>> allElasticSearchFieldTypeInfoMap
= new HashMap<>();
properties
.fields()
.forEachRemaining(
@@ -535,26 +543,96 @@ public class EsRestClient {
String fieldName = entry.getKey();
JsonNode fieldProperty = entry.getValue();
if (fieldProperty.has("type")) {
- allElasticSearchFieldTypeInfoMap.put(
- fieldName,
fieldProperty.get("type").asText());
+ String type =
fieldProperty.get("type").asText();
+ BasicTypeDefine.BasicTypeDefineBuilder<EsType>
typeDefine =
+ BasicTypeDefine.<EsType>builder()
+ .name(fieldName)
+ .columnType(type)
+ .dataType(type);
+ if
(type.equalsIgnoreCase(AGGREGATE_METRIC_DOUBLE)) {
+ ArrayNode metrics = ((ArrayNode)
fieldProperty.get("metrics"));
+ List<String> metricsList = new
ArrayList<>();
+ for (JsonNode node : metrics) {
+ metricsList.add(node.asText());
+ }
+ Map<String, Object> options = new
HashMap<>();
+ options.put("metrics", metricsList);
+ typeDefine.nativeType(new EsType(type,
options));
+ } else if (type.equalsIgnoreCase(ALIAS)) {
+ String path =
fieldProperty.get("path").asText();
+ Map<String, Object> options = new
HashMap<>();
+ options.put("path", path);
+ typeDefine.nativeType(new EsType(type,
options));
+ } else if
(type.equalsIgnoreCase(DENSE_VECTOR)) {
+ String elementType =
+ fieldProperty.get("element_type")
== null
+ ? "float"
+ :
fieldProperty.get("element_type").asText();
+ Map<String, Object> options = new
HashMap<>();
+ options.put("element_type", elementType);
+ typeDefine.nativeType(new EsType(type,
options));
+ } else if (type.equalsIgnoreCase(DATE)
+ || type.equalsIgnoreCase(DATE_NANOS)) {
+ String format =
+ fieldProperty.get("format") != null
+ ?
fieldProperty.get("format").asText()
+ :
"strict_date_optional_time_nanos||epoch_millis";
+ Map<String, Object> options = new
HashMap<>();
+ options.put("format", format);
+ typeDefine.nativeType(new EsType(type,
options));
+ } else {
+ typeDefine.nativeType(new EsType(type, new
HashMap<>()));
+ }
+
allElasticSearchFieldTypeInfoMap.put(fieldName, typeDefine.build());
+ } else if (fieldProperty.has("properties")) {
+ // it should be object type
+ JsonNode propertiesNode =
fieldProperty.get("properties");
+ List<String> fields = new ArrayList<>();
+
propertiesNode.fieldNames().forEachRemaining(fields::add);
+ Map<String, BasicTypeDefine<EsType>>
subFieldTypeInfoMap =
+
getFieldTypeMappingFromProperties(propertiesNode, fields);
+ BasicTypeDefine.BasicTypeDefineBuilder<EsType>
typeDefine =
+ BasicTypeDefine.<EsType>builder()
+ .name(fieldName)
+ .columnType(OBJECT)
+ .dataType(OBJECT);
+ typeDefine.nativeType(
+ new EsType(OBJECT, (Map)
subFieldTypeInfoMap));
+
allElasticSearchFieldTypeInfoMap.put(fieldName, typeDefine.build());
}
});
if (CollectionUtils.isEmpty(source)) {
return allElasticSearchFieldTypeInfoMap;
}
+ allElasticSearchFieldTypeInfoMap.forEach(
+ (fieldName, fieldType) -> {
+ if (fieldType.getDataType().equalsIgnoreCase(ALIAS)) {
+ BasicTypeDefine<EsType> type =
+ allElasticSearchFieldTypeInfoMap.get(
+
fieldType.getNativeType().getOptions().get("path"));
+ if (type != null) {
+ allElasticSearchFieldTypeInfoMap.put(fieldName,
type);
+ }
+ }
+ });
+
return source.stream()
.collect(
Collectors.toMap(
Function.identity(),
fieldName -> {
- String fieldType =
+ BasicTypeDefine<EsType> fieldType =
allElasticSearchFieldTypeInfoMap.get(fieldName);
if (fieldType == null) {
log.warn(
"fail to get elasticsearch
field {} mapping type,so give a default type text",
fieldName);
- return "text";
+ return
BasicTypeDefine.<EsType>builder()
+ .name(fieldName)
+ .columnType("text")
+ .dataType("text")
+ .build();
}
return fieldType;
}));
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsType.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsType.java
new file mode 100644
index 0000000000..921ed44e98
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsType.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.Map;
+
+@Getter
+@AllArgsConstructor
+public class EsType {
+
+ public static final String AGGREGATE_METRIC_DOUBLE =
"aggregate_metric_double";
+ public static final String ALIAS = "alias";
+ public static final String BINARY = "binary";
+ public static final String BYTE = "byte";
+ public static final String BOOLEAN = "boolean";
+ public static final String COMPLETION = "completion";
+ public static final String DATE = "date";
+ public static final String DATE_NANOS = "date_nanos";
+ public static final String DENSE_VECTOR = "dense_vector";
+ public static final String DOUBLE = "double";
+ public static final String FLATTENED = "flattened";
+ public static final String FLOAT = "float";
+ public static final String GEO_POINT = "geo_point";
+ public static final String GEO_SHAPE = "geo_shape";
+ public static final String POINT = "point";
+ public static final String INTEGER_RANGE = "integer_range";
+ public static final String FLOAT_RANGE = "float_range";
+ public static final String LONG_RANGE = "long_range";
+ public static final String DOUBLE_RANGE = "double_range";
+ public static final String DATE_RANGE = "date_range";
+ public static final String IP_RANGE = "ip_range";
+ public static final String HALF_FLOAT = "half_float";
+ public static final String SCALED_FLOAT = "scaled_float";
+ public static final String HISTOGRAM = "histogram";
+ public static final String INTEGER = "integer";
+ public static final String IP = "ip";
+ public static final String JOIN = "join";
+ public static final String KEYWORD = "keyword";
+ public static final String LONG = "long";
+ public static final String NESTED = "nested";
+ public static final String OBJECT = "object";
+ public static final String PERCOLATOR = "percolator";
+ public static final String RANK_FEATURE = "rank_feature";
+ public static final String RANK_FEATURES = "rank_features";
+ public static final String SEARCH_AS_YOU_TYPE = "search_as_you_type";
+ public static final String SHORT = "short";
+ public static final String SHAPE = "shape";
+ public static final String STRING = "string";
+ public static final String SPARSE_VECTOR = "sparse_vector";
+ public static final String TEXT = "text";
+ public static final String MATCH_ONLY_TEXT = "match_only_text";
+ public static final String TOKEN_COUNT = "token_count";
+ public static final String UNSIGNED_LONG = "unsigned_long";
+ public static final String VERSION = "version";
+
+ private String type;
+ private Map<String, Object> options;
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
index cb10ed58c0..67226341b5 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -31,8 +31,8 @@ public class IndexInfo {
private String[] primaryKeys;
private String keyDelimiter;
- public IndexInfo(ReadonlyConfig config) {
- index = config.get(SinkConfig.INDEX);
+ public IndexInfo(String index, ReadonlyConfig config) {
+ this.index = index;
type = config.get(SinkConfig.INDEX_TYPE);
if (config.getOptional(SinkConfig.PRIMARY_KEYS).isPresent()) {
primaryKeys = config.get(SinkConfig.PRIMARY_KEYS).toArray(new
String[0]);
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index 584c373ae7..2f7eb86b91 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -36,6 +36,7 @@ import lombok.NonNull;
import java.time.temporal.Temporal;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -82,7 +83,7 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
private String serializeUpsert(SeaTunnelRow row) {
String key = keyExtractor.apply(row);
- Map<String, Object> document = toDocumentMap(row);
+ Map<String, Object> document = toDocumentMap(row, seaTunnelRowType);
String documentStr;
try {
@@ -161,22 +162,44 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
.toString();
}
- private Map<String, Object> toDocumentMap(SeaTunnelRow row) {
- String[] fieldNames = seaTunnelRowType.getFieldNames();
+ private Map<String, Object> toDocumentMap(SeaTunnelRow row,
SeaTunnelRowType rowType) {
+ String[] fieldNames = rowType.getFieldNames();
Map<String, Object> doc = new HashMap<>(fieldNames.length);
Object[] fields = row.getFields();
for (int i = 0; i < fieldNames.length; i++) {
Object value = fields[i];
- if (value instanceof Temporal) {
- // jackson not support jdk8 new time api
- doc.put(fieldNames[i], value.toString());
+ if (value == null) {
+ } else if (value instanceof SeaTunnelRow) {
+ doc.put(
+ fieldNames[i],
+ toDocumentMap(
+ (SeaTunnelRow) value, (SeaTunnelRowType)
rowType.getFieldType(i)));
} else {
- doc.put(fieldNames[i], value);
+ doc.put(fieldNames[i], convertValue(value));
}
}
return doc;
}
+ private Object convertValue(Object value) {
+ if (value instanceof Temporal) {
+ // jackson not support jdk8 new time api
+ return value.toString();
+ } else if (value instanceof Map) {
+ for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
+ ((Map) value).put(entry.getKey(),
convertValue(entry.getValue()));
+ }
+ return value;
+ } else if (value instanceof List) {
+ for (int i = 0; i < ((List) value).size(); i++) {
+ ((List) value).set(i, convertValue(((List) value).get(i)));
+ }
+ return value;
+ } else {
+ return value;
+ }
+ }
+
private Map<String, String> createMetadata(@NonNull SeaTunnelRow row,
@NonNull String key) {
Map<String, String> actionMetadata = createMetadata(row);
actionMetadata.put("_id", key);
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 2d23b7dc51..f3acd191ef 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -91,6 +91,9 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
put(
"yyyy-MM-dd HH:mm:ss.SSSSSS".length(),
DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss.SSSSSS"));
+ put(
+ "yyyy-MM-dd HH:mm:ss.SSSSSSSSS".length(),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss.SSSSSSSSS"));
}
};
@@ -185,6 +188,25 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
convertMap.put(convertKey, convertValue);
}
return convertMap;
+ } else if (fieldType instanceof SeaTunnelRowType) {
+ SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
+ Map<String, Object> collect =
+ mapper.readValue(fieldValue, new TypeReference<Map<String,
Object>>() {});
+ Object[] seaTunnelFields = new Object[rowType.getTotalFields()];
+ for (int i = 0; i < rowType.getTotalFields(); i++) {
+ String fieldName = rowType.getFieldName(i);
+ SeaTunnelDataType<?> fieldDataType = rowType.getFieldType(i);
+ Object value = collect.get(fieldName);
+ if (value != null) {
+ seaTunnelFields[i] =
+ convertValue(
+ fieldDataType,
+ (value instanceof List || value instanceof
Map)
+ ? mapper.writeValueAsString(value)
+ : value.toString());
+ }
+ }
+ return new SeaTunnelRow(seaTunnelFields);
} else if (fieldType instanceof PrimitiveByteArrayType) {
return Base64.getDecoder().decode(fieldValue);
} else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
@@ -204,7 +226,7 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
} catch (NumberFormatException e) {
// no op
}
- String formatDate = fieldValue.replace("T", " ");
+ String formatDate = fieldValue.replace("T", " ").replace("Z", "");
if (fieldValue.length() == "yyyyMMdd".length()
|| fieldValue.length() == "yyyy-MM-dd".length()) {
formatDate = fieldValue + " 00:00:00";
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index 3d160adc07..d2ca6045eb 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -72,7 +72,7 @@ public class ElasticsearchSink
public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo,
ElasticsearchSinkState> createWriter(
SinkWriter.Context context) {
return new ElasticsearchSinkWriter(
- context, catalogTable.getSeaTunnelRowType(), config,
maxBatchSize, maxRetryCount);
+ context, catalogTable, config, maxBatchSize, maxRetryCount);
}
@Override
@@ -89,7 +89,7 @@ public class ElasticsearchSink
SchemaSaveMode schemaSaveMode =
config.get(SinkConfig.SCHEMA_SAVE_MODE);
DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
- TablePath tablePath = TablePath.of("", config.get(SinkConfig.INDEX));
+ TablePath tablePath = TablePath.of("",
catalogTable.getTableId().getTableName());
catalog.open();
return Optional.of(
new DefaultSaveModeHandler(
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index 97548e3fdb..ad2c01e47e 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -26,6 +28,7 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
@@ -72,6 +75,18 @@ public class ElasticsearchSinkFactory implements
TableSinkFactory {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
- return () -> new ElasticsearchSink(context.getOptions(),
context.getCatalogTable());
+ String original = context.getOptions().get(INDEX);
+ original =
+ original.replace(
+ REPLACE_TABLE_NAME_KEY,
+ context.getCatalogTable().getTableId().getTableName());
+ CatalogTable newTable =
+ CatalogTable.of(
+ TableIdentifier.of(
+ context.getCatalogTable().getCatalogName(),
+
context.getCatalogTable().getTablePath().getDatabaseName(),
+ original),
+ context.getCatalogTable());
+ return () -> new ElasticsearchSink(context.getOptions(), newTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 35ed49d498..6edac760c1 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -19,9 +19,9 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.RetryUtils.RetryMaterial;
@@ -61,18 +61,20 @@ public class ElasticsearchSinkWriter
public ElasticsearchSinkWriter(
SinkWriter.Context context,
- SeaTunnelRowType seaTunnelRowType,
+ CatalogTable catalogTable,
ReadonlyConfig config,
int maxBatchSize,
int maxRetryCount) {
this.context = context;
this.maxBatchSize = maxBatchSize;
- IndexInfo indexInfo = new IndexInfo(config);
+ IndexInfo indexInfo = new
IndexInfo(catalogTable.getTableId().getTableName(), config);
esRestClient = EsRestClient.createInstance(config);
this.seaTunnelRowSerializer =
new ElasticsearchRowSerializer(
- esRestClient.getClusterInfo(), indexInfo,
seaTunnelRowType);
+ esRestClient.getClusterInfo(),
+ indexInfo,
+ catalogTable.getSeaTunnelRowType());
this.requestEsList = new ArrayList<>(maxBatchSize);
this.retryMaterial =
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index e99e66e420..9909c9bba9 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -30,10 +30,12 @@ import
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchDataTypeConvertor;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
import java.util.Arrays;
@@ -62,16 +64,14 @@ public class ElasticsearchSource
} else {
source = config.get(SourceConfig.SOURCE);
EsRestClient esRestClient = EsRestClient.createInstance(config);
- Map<String, String> esFieldType =
+ Map<String, BasicTypeDefine<EsType>> esFieldType =
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
esRestClient.close();
SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType[source.size()];
- ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor =
- new ElasticSearchDataTypeConvertor();
for (int i = 0; i < source.size(); i++) {
- String esType = esFieldType.get(source.get(i));
+ BasicTypeDefine<EsType> esType =
esFieldType.get(source.get(i));
SeaTunnelDataType<?> seaTunnelDataType =
-
elasticSearchDataTypeConvertor.toSeaTunnelType(source.get(i), esType);
+
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
fieldTypes[i] = seaTunnelDataType;
}
TableSchema.Builder builder = TableSchema.builder();
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
index 6efa5bba4c..5a269e0737 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
@@ -49,7 +49,7 @@ public class ElasticsearchRowSerializerTest {
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
- IndexInfo indexInfo = new IndexInfo(pluginConf);
+ IndexInfo indexInfo = new IndexInfo(index, pluginConf);
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {primaryKey, "name"},
@@ -88,7 +88,7 @@ public class ElasticsearchRowSerializerTest {
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
- IndexInfo indexInfo = new IndexInfo(pluginConf);
+ IndexInfo indexInfo = new IndexInfo(index, pluginConf);
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {"id", "name"},
@@ -127,7 +127,7 @@ public class ElasticsearchRowSerializerTest {
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
- IndexInfo indexInfo = new IndexInfo(pluginConf);
+ IndexInfo indexInfo = new IndexInfo(index, pluginConf);
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {primaryKey, "name"},
@@ -165,7 +165,7 @@ public class ElasticsearchRowSerializerTest {
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
- IndexInfo indexInfo = new IndexInfo(pluginConf);
+ IndexInfo indexInfo = new IndexInfo(index, pluginConf);
SeaTunnelRowType schema =
new SeaTunnelRowType(
new String[] {primaryKey, "name"},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index ddd106451f..0a8d51c7ab 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -26,10 +26,14 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -48,6 +52,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -78,7 +83,7 @@ public class ElasticsearchIT extends TestSuiteBase implements
TestResource {
public void startUp() throws Exception {
container =
new ElasticsearchContainer(
- DockerImageName.parse("elasticsearch:8.0.0")
+ DockerImageName.parse("elasticsearch:8.9.0")
.asCompatibleSubstituteFor(
"docker.elastic.co/elasticsearch/elasticsearch"))
.withNetwork(NETWORK)
@@ -89,7 +94,7 @@ public class ElasticsearchIT extends TestSuiteBase implements
TestResource {
.withStartupTimeout(Duration.ofMinutes(5))
.withLogConsumer(
new Slf4jLogConsumer(
-
DockerLoggerFactory.getLogger("elasticsearch:8.0.0")));
+
DockerLoggerFactory.getLogger("elasticsearch:8.9.0")));
Startables.deepStart(Stream.of(container)).join();
log.info("Elasticsearch container started");
esRestClient =
@@ -105,6 +110,7 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Optional.empty());
testDataset = generateTestDataSet();
createIndexDocs();
+ createIndexWithFullType();
}
/** create a index,and bulk some documents */
@@ -125,6 +131,31 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
esRestClient.bulk(requestBody.toString());
}
+ private void createIndexWithFullType() throws IOException,
InterruptedException {
+ String mapping =
+ IOUtils.toString(
+ ContainerUtil.getResourcesFile(
+
"/elasticsearch/st_index_full_type_mapping.json")
+ .toURI(),
+ StandardCharsets.UTF_8);
+ esRestClient.createIndex("st_index_full_type", mapping);
+ BulkResponse response =
+ esRestClient.bulk(
+ "{ \"index\" : { \"_index\" : \"st_index_full_type\",
\"_id\" : \"1\" } }\n"
+ + IOUtils.toString(
+ ContainerUtil.getResourcesFile(
+
"/elasticsearch/st_index_full_type_data.json")
+ .toURI(),
+ StandardCharsets.UTF_8)
+ .replace("\n", "")
+ + "\n");
+ Assertions.assertFalse(response.isErrors(), response.getResponse());
+ // waiting index refresh
+ Thread.sleep(2000L);
+ Assertions.assertEquals(
+ 2,
esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
+ }
+
@TestTemplate
public void testElasticsearch(TestContainer container)
throws IOException, InterruptedException {
@@ -136,6 +167,18 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
+ @TestTemplate
+ public void testElasticsearchWithFullType(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/elasticsearch/elasticsearch_source_and_sink_full_type.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Thread.sleep(2000L);
+ Assertions.assertEquals(
+ 1,
+
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
+ }
+
private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_full_type.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_full_type.conf
new file mode 100644
index 0000000000..4c2ca0fae0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_full_type.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ #checkpoint.interval = 10000
+}
+
+source {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+ index = "st_index_full_type"
+ source = [
+ "aggregate_metric_double",
+ "alias",
+ "binary",
+ "byte",
+ "boolean",
+ "completion",
+ "date",
+ "date_nanos",
+ "dense_vector",
+ "double",
+ "flattened",
+ "float",
+ "geo_point",
+ "geo_shape",
+ "point",
+ "integer_range",
+ "float_range",
+ "long_range",
+ "double_range",
+ "date_range",
+ "ip_range",
+ "half_float",
+ "scaled_float",
+ "histogram",
+ "integer",
+ "ip",
+ "join",
+ "keyword",
+ "long",
+ "nested",
+ "object",
+ "percolator",
+ "rank_feature",
+ "rank_features",
+ "shape",
+ "search_as_you_type",
+ "short",
+ "text",
+ "match_only_text",
+ "name",
+ "unsigned_long",
+ "version"
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+ index = "st_index_full_type_target"
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_data.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_data.json
new file mode 100644
index 0000000000..ace28fbacb
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_data.json
@@ -0,0 +1,137 @@
+{
+ "aggregate_metric_double": {
+ "min": 10,
+ "max": 100,
+ "sum": 1000,
+ "value_count": 5
+ },
+ "binary": "binary_data",
+ "byte": 127,
+ "boolean": true,
+ "completion": {
+ "input": [
+ "search term",
+ "another term"
+ ]
+ },
+ "date": "2024-03-19",
+ "date_nanos": "2024-03-19T12:30:45.123456789Z",
+ "dense_vector": [
+ 1.0,
+ 2.0,
+ 3.0
+ ],
+ "double": 3.14159,
+ "flattened": {
+ "nested_field1": "value1",
+ "nested_field2": "value2"
+ },
+ "float": 3.14,
+ "geo_point": {
+ "lat": 40.7128,
+ "lon": -74.0060
+ },
+ "geo_shape": {
+ "type": "point",
+ "coordinates": [
+ 100.0,
+ 0.0
+ ]
+ },
+ "point": {
+ "type": "Point",
+ "coordinates": [
+ 100.0,
+ 0.0
+ ]
+ },
+ "integer_range": {
+ "gte": 10,
+ "lte": 20
+ },
+ "float_range": {
+ "gte": 1.0,
+ "lte": 5.0
+ },
+ "long_range": {
+ "gte": 100,
+ "lte": 200
+ },
+ "double_range": {
+ "gte": 1.0,
+ "lte": 10.0
+ },
+ "date_range": {
+ "gte": "2024-01-01",
+ "lte": "2024-03-31"
+ },
+ "ip_range": {
+ "gte": "192.0.2.0",
+ "lte": "192.0.2.255"
+ },
+ "half_float": 3.14,
+ "scaled_float": 1.23,
+ "histogram": {
+ "values": [
+ 0.1,
+ 0.2,
+ 0.3,
+ 0.4,
+ 0.5
+ ],
+ "counts": [
+ 3,
+ 7,
+ 23,
+ 12,
+ 6
+ ]
+ },
+ "integer": 42,
+ "ip": "192.0.2.1",
+ "join": {
+ "name": "question"
+ },
+ "keyword": "keyword_value",
+ "long": 1234567890,
+ "nested": {
+ "nested_field1": "value1",
+ "nested_field2": "value2"
+ },
+ "object": {
+ "age": 30,
+ "name": {
+ "first": "John",
+ "last": "Doe"
+ }
+ },
+ "percolator": {
+ "match": {
+ "keyword": "keyword_value"
+ }
+ },
+ "rank_feature": 5.0,
+ "rank_features": {
+ "feature1": 10.0,
+ "feature2": 20.0
+ },
+ "shape": "POINT (-377.03653 389.897676)",
+ "search_as_you_type": "searchable text",
+ "short": 32767,
+ "sparse_vector": {
+ "index": [
+ 0,
+ 2,
+ 4
+ ],
+ "values": [
+ 1.0,
+ 2.0,
+ 3.0
+ ]
+ },
+ "text": "full text",
+ "match_only_text": "match only text",
+ "name": "John Doe",
+ "version": "1.0"
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_mapping.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_mapping.json
new file mode 100644
index 0000000000..bf7df0d279
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_mapping.json
@@ -0,0 +1,162 @@
+{
+ "mappings": {
+ "properties": {
+ "aggregate_metric_double": {
+ "type": "aggregate_metric_double",
+ "metrics": [
+ "min",
+ "max",
+ "sum",
+ "value_count"
+ ],
+ "default_metric": "max"
+ },
+ "alias": {
+ "type": "alias",
+ "path": "aggregate_metric_double"
+ },
+ "binary": {
+ "type": "binary"
+ },
+ "byte": {
+ "type": "byte"
+ },
+ "boolean": {
+ "type": "boolean"
+ },
+ "completion": {
+ "type": "completion"
+ },
+ "date": {
+ "type": "date"
+ },
+ "date_nanos": {
+ "type": "date_nanos"
+ },
+ "dense_vector": {
+ "type": "dense_vector",
+ "dims": 3
+ },
+ "double": {
+ "type": "double"
+ },
+ "flattened": {
+ "type": "flattened"
+ },
+ "float": {
+ "type": "float"
+ },
+ "geo_point": {
+ "type": "geo_point"
+ },
+ "geo_shape": {
+ "type": "geo_shape"
+ },
+ "point": {
+ "type": "point"
+ },
+ "integer_range": {
+ "type": "integer_range"
+ },
+ "float_range": {
+ "type": "float_range"
+ },
+ "long_range": {
+ "type": "long_range"
+ },
+ "double_range": {
+ "type": "double_range"
+ },
+ "date_range": {
+ "type": "date_range"
+ },
+ "ip_range": {
+ "type": "ip_range"
+ },
+ "half_float": {
+ "type": "half_float"
+ },
+ "scaled_float": {
+ "type": "scaled_float",
+ "scaling_factor": 100
+ },
+ "histogram": {
+ "type": "histogram"
+ },
+ "integer": {
+ "type": "integer"
+ },
+ "ip": {
+ "type": "ip"
+ },
+ "join": {
+ "type": "join",
+ "relations": {
+ "question": "answer"
+ }
+ },
+ "keyword": {
+ "type": "keyword"
+ },
+ "long": {
+ "type": "long"
+ },
+ "nested": {
+ "type": "nested"
+ },
+ "object": {
+ "properties": {
+ "age": {
+ "type": "integer"
+ },
+ "name": {
+ "properties": {
+ "first": {
+ "type": "text"
+ },
+ "last": {
+ "type": "text"
+ }
+ }
+ }
+ }
+ },
+ "percolator": {
+ "type": "percolator"
+ },
+ "rank_feature": {
+ "type": "rank_feature"
+ },
+ "rank_features": {
+ "type": "rank_features"
+ },
+ "shape": {
+ "type": "shape"
+ },
+ "search_as_you_type": {
+ "type": "search_as_you_type"
+ },
+ "short": {
+ "type": "short"
+ },
+ "text": {
+ "type": "text"
+ },
+ "match_only_text": {
+ "type": "text"
+ },
+ "name": {
+ "type": "text",
+ "fields": {
+ "length": {
+ "type": "token_count",
+ "analyzer": "standard"
+ }
+ }
+ },
+ "version": {
+ "type": "version"
+ }
+ }
+ }
+}
\ No newline at end of file