This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 505c1252bd [Improve][Connector-V2] Add ElasticSearch type converter 
(#6546)
505c1252bd is described below

commit 505c1252bd35ef4ec07ec31cdecb92b660000a00
Author: Jia Fan <[email protected]>
AuthorDate: Fri Mar 29 10:44:34 2024 +0800

    [Improve][Connector-V2] Add ElasticSearch type converter (#6546)
---
 .../catalog/ElasticSearchCatalog.java              |  11 +-
 .../catalog/ElasticSearchDataTypeConvertor.java    |  89 ++---
 .../catalog/ElasticSearchTypeConverter.java        | 364 +++++++++++++++++++++
 .../elasticsearch/client/EsRestClient.java         | 134 ++++++--
 .../seatunnel/elasticsearch/client/EsType.java     |  76 +++++
 .../seatunnel/elasticsearch/dto/IndexInfo.java     |   4 +-
 .../serialize/ElasticsearchRowSerializer.java      |  37 ++-
 .../source/DefaultSeaTunnelRowDeserializer.java    |  24 +-
 .../elasticsearch/sink/ElasticsearchSink.java      |   4 +-
 .../sink/ElasticsearchSinkFactory.java             |  17 +-
 .../sink/ElasticsearchSinkWriter.java              |  10 +-
 .../elasticsearch/source/ElasticsearchSource.java  |  12 +-
 .../serialize/ElasticsearchRowSerializerTest.java  |   8 +-
 .../connector/elasticsearch/ElasticsearchIT.java   |  47 ++-
 .../elasticsearch_source_and_sink_full_type.conf   |  97 ++++++
 .../elasticsearch/st_index_full_type_data.json     | 137 ++++++++
 .../elasticsearch/st_index_full_type_mapping.json  | 162 +++++++++
 17 files changed, 1104 insertions(+), 129 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 066a69c2dc..b1eb60e289 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -32,7 +32,9 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistExce
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
 
@@ -146,10 +148,8 @@ public class ElasticSearchCatalog implements Catalog {
             throws CatalogException, TableNotExistException {
         // Get the index mapping?
         checkNotNull(tablePath, "tablePath cannot be null");
-        ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor =
-                new ElasticSearchDataTypeConvertor();
         TableSchema.Builder builder = TableSchema.builder();
-        Map<String, String> fieldTypeMapping =
+        Map<String, BasicTypeDefine<EsType>> fieldTypeMapping =
                 esRestClient.getFieldTypeMapping(tablePath.getTableName(), 
Collections.emptyList());
         buildColumnsWithErrorCheck(
                 tablePath,
@@ -159,8 +159,9 @@ public class ElasticSearchCatalog implements Catalog {
                     // todo: we need to add a new type TEXT or add length in 
STRING type
                     return PhysicalColumn.of(
                             nameAndType.getKey(),
-                            elasticSearchDataTypeConvertor.toSeaTunnelType(
-                                    nameAndType.getKey(), 
nameAndType.getValue()),
+                            ElasticSearchTypeConverter.INSTANCE
+                                    .convert(nameAndType.getValue())
+                                    .getDataType(),
                             (Long) null,
                             true,
                             null,
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
index 0e081f83ef..7aecdfb9ea 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchDataTypeConvertor.java
@@ -17,11 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
 
 import com.google.auto.service.AutoService;
 
@@ -29,22 +30,11 @@ import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+/** @deprecated instead by {@link ElasticSearchTypeConverter} */
+@Deprecated
 @AutoService(DataTypeConvertor.class)
 public class ElasticSearchDataTypeConvertor implements 
DataTypeConvertor<String> {
 
-    public static final String STRING = "string";
-    public static final String KEYWORD = "keyword";
-    public static final String TEXT = "text";
-    public static final String BOOLEAN = "boolean";
-    public static final String BYTE = "byte";
-    public static final String SHORT = "short";
-    public static final String INTEGER = "integer";
-    public static final String LONG = "long";
-    public static final String FLOAT = "float";
-    public static final String HALF_FLOAT = "half_float";
-    public static final String DOUBLE = "double";
-    public static final String DATE = "date";
-
     @Override
     public SeaTunnelDataType<?> toSeaTunnelType(String field, String 
connectorDataType) {
         return toSeaTunnelType(field, connectorDataType, null);
@@ -54,34 +44,14 @@ public class ElasticSearchDataTypeConvertor implements 
DataTypeConvertor<String>
     public SeaTunnelDataType<?> toSeaTunnelType(
             String field, String connectorDataType, Map<String, Object> 
dataTypeProperties) {
         checkNotNull(connectorDataType, "connectorDataType can not be null");
-        switch (connectorDataType) {
-            case STRING:
-                return BasicType.STRING_TYPE;
-            case KEYWORD:
-                return BasicType.STRING_TYPE;
-            case TEXT:
-                return BasicType.STRING_TYPE;
-            case BOOLEAN:
-                return BasicType.BOOLEAN_TYPE;
-            case BYTE:
-                return BasicType.BYTE_TYPE;
-            case SHORT:
-                return BasicType.SHORT_TYPE;
-            case INTEGER:
-                return BasicType.INT_TYPE;
-            case LONG:
-                return BasicType.LONG_TYPE;
-            case FLOAT:
-                return BasicType.FLOAT_TYPE;
-            case HALF_FLOAT:
-                return BasicType.FLOAT_TYPE;
-            case DOUBLE:
-                return BasicType.DOUBLE_TYPE;
-            case DATE:
-                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
-            default:
-                return BasicType.STRING_TYPE;
-        }
+        BasicTypeDefine<EsType> typeDefine =
+                BasicTypeDefine.<EsType>builder()
+                        .name(field)
+                        .columnType(connectorDataType)
+                        .dataType(connectorDataType)
+                        .build();
+
+        return 
ElasticSearchTypeConverter.INSTANCE.convert(typeDefine).getDataType();
     }
 
     @Override
@@ -90,29 +60,14 @@ public class ElasticSearchDataTypeConvertor implements 
DataTypeConvertor<String>
             SeaTunnelDataType<?> seaTunnelDataType,
             Map<String, Object> dataTypeProperties) {
         checkNotNull(seaTunnelDataType, "seaTunnelDataType can not be null");
-        SqlType sqlType = seaTunnelDataType.getSqlType();
-        switch (sqlType) {
-            case STRING:
-                return STRING;
-            case BOOLEAN:
-                return BOOLEAN;
-            case BYTES:
-                return BYTE;
-            case TINYINT:
-                return SHORT;
-            case INT:
-                return INTEGER;
-            case BIGINT:
-                return LONG;
-            case FLOAT:
-                return FLOAT;
-            case DOUBLE:
-                return DOUBLE;
-            case TIMESTAMP:
-                return DATE;
-            default:
-                return STRING;
-        }
+        Column column =
+                PhysicalColumn.builder()
+                        .name(field)
+                        .dataType(seaTunnelDataType)
+                        .nullable(true)
+                        .build();
+        BasicTypeDefine<EsType> typeDefine = 
ElasticSearchTypeConverter.INSTANCE.reconvert(column);
+        return typeDefine.getColumnType();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
new file mode 100644
index 0000000000..c7e21d1385
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeConverter;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.AGGREGATE_METRIC_DOUBLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.BINARY;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.BOOLEAN;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.BYTE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.COMPLETION;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_NANOS;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_RANGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DENSE_VECTOR;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DOUBLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DOUBLE_RANGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.FLATTENED;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.FLOAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.FLOAT_RANGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.GEO_POINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.GEO_SHAPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.HALF_FLOAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.HISTOGRAM;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.INTEGER;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.INTEGER_RANGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.IP;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.IP_RANGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.JOIN;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.KEYWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.RANK_FEATURE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.RANK_FEATURES;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.SEARCH_AS_YOU_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.SHAPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.SHORT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.SPARSE_VECTOR;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.STRING;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.TEXT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.TOKEN_COUNT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.UNSIGNED_LONG;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.VERSION;
+
+@AutoService(TypeConverter.class)
+public class ElasticSearchTypeConverter implements 
BasicTypeConverter<BasicTypeDefine<EsType>> {
+    public static final ElasticSearchTypeConverter INSTANCE = new 
ElasticSearchTypeConverter();
+
+    @Override
+    public String identifier() {
+        return "Elasticsearch";
+    }
+
+    @Override
+    public Column convert(BasicTypeDefine<EsType> typeDefine) {
+        PhysicalColumn.PhysicalColumnBuilder builder =
+                PhysicalColumn.builder()
+                        .name(typeDefine.getName())
+                        .sourceType(typeDefine.getColumnType())
+                        .nullable(typeDefine.isNullable())
+                        .defaultValue(typeDefine.getDefaultValue())
+                        .comment(typeDefine.getComment());
+        String type = typeDefine.getDataType().toLowerCase();
+        switch (type) {
+            case AGGREGATE_METRIC_DOUBLE:
+                List<String> metrics =
+                        (List<String>) 
typeDefine.getNativeType().getOptions().get("metrics");
+                builder.dataType(
+                        new SeaTunnelRowType(
+                                metrics.toArray(new String[0]),
+                                metrics.stream()
+                                        .map(s -> BasicType.DOUBLE_TYPE)
+                                        
.toArray(SeaTunnelDataType<?>[]::new)));
+                break;
+            case DENSE_VECTOR:
+                String elementType =
+                        
typeDefine.getNativeType().getOptions().get("element_type").toString();
+                if (elementType.equals("byte")) {
+                    builder.dataType(ArrayType.BYTE_ARRAY_TYPE);
+                } else {
+                    builder.dataType(ArrayType.FLOAT_ARRAY_TYPE);
+                }
+                break;
+            case BYTE:
+                builder.dataType(BasicType.BYTE_TYPE);
+                break;
+            case BOOLEAN:
+                builder.dataType(BasicType.BOOLEAN_TYPE);
+                break;
+            case DATE:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+                builder.scale(3);
+                break;
+            case DATE_NANOS:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+                builder.scale(9);
+                break;
+            case DOUBLE:
+            case RANK_FEATURE:
+                builder.dataType(BasicType.DOUBLE_TYPE);
+                break;
+            case FLOAT:
+            case HALF_FLOAT:
+                builder.dataType(BasicType.FLOAT_TYPE);
+                break;
+            case HISTOGRAM:
+                SeaTunnelRowType rowType =
+                        new SeaTunnelRowType(
+                                new String[] {"values", "counts"},
+                                new SeaTunnelDataType<?>[] {
+                                    ArrayType.DOUBLE_ARRAY_TYPE, 
ArrayType.LONG_ARRAY_TYPE
+                                });
+                builder.dataType(rowType);
+                break;
+            case INTEGER:
+            case TOKEN_COUNT:
+                builder.dataType(BasicType.INT_TYPE);
+                break;
+            case LONG:
+                builder.dataType(BasicType.LONG_TYPE);
+                break;
+            case SHORT:
+                builder.dataType(BasicType.SHORT_TYPE);
+                break;
+            case OBJECT:
+                Map<String, BasicTypeDefine<EsType>> typeInfo =
+                        (Map) typeDefine.getNativeType().getOptions();
+                SeaTunnelRowType object =
+                        new SeaTunnelRowType(
+                                typeInfo.keySet().toArray(new String[0]),
+                                typeInfo.values().stream()
+                                        .map(this::convert)
+                                        .map(Column::getDataType)
+                                        .toArray(SeaTunnelDataType<?>[]::new));
+                builder.dataType(object);
+                break;
+            case INTEGER_RANGE:
+                builder.dataType(new MapType<>(BasicType.STRING_TYPE, 
BasicType.INT_TYPE));
+                break;
+            case FLOAT_RANGE:
+                builder.dataType(new MapType<>(BasicType.STRING_TYPE, 
BasicType.FLOAT_TYPE));
+                break;
+            case LONG_RANGE:
+                builder.dataType(new MapType<>(BasicType.STRING_TYPE, 
BasicType.LONG_TYPE));
+                break;
+            case DOUBLE_RANGE:
+                builder.dataType(new MapType<>(BasicType.STRING_TYPE, 
BasicType.DOUBLE_TYPE));
+                break;
+            case DATE_RANGE:
+                builder.dataType(
+                        new MapType<>(BasicType.STRING_TYPE, 
LocalTimeType.LOCAL_DATE_TIME_TYPE));
+                break;
+            case IP_RANGE:
+                builder.dataType(new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE));
+                break;
+            case UNSIGNED_LONG:
+                builder.dataType(new DecimalType(20, 0));
+                builder.columnLength(20L);
+                builder.scale(0);
+                break;
+            case TEXT:
+            case BINARY:
+            case VERSION:
+            case IP:
+            case JOIN:
+            case KEYWORD:
+            case FLATTENED:
+            case GEO_POINT:
+            case COMPLETION:
+            case STRING:
+            case GEO_SHAPE:
+            case NESTED:
+            case PERCOLATOR:
+            case POINT:
+            case RANK_FEATURES:
+            case SEARCH_AS_YOU_TYPE:
+            case SPARSE_VECTOR:
+            case MATCH_ONLY_TEXT:
+            case SHAPE:
+            default:
+                builder.dataType(BasicType.STRING_TYPE);
+                break;
+        }
+        return builder.build();
+    }
+
+    @Override
+    public BasicTypeDefine<EsType> reconvert(Column column) {
+        BasicTypeDefine.BasicTypeDefineBuilder<EsType> builder =
+                BasicTypeDefine.<EsType>builder()
+                        .name(column.getName())
+                        .nullable(column.isNullable())
+                        .comment(column.getComment())
+                        .defaultValue(column.getDefaultValue());
+        switch (column.getDataType().getSqlType()) {
+            case BOOLEAN:
+                builder.columnType(BOOLEAN);
+                builder.dataType(BOOLEAN);
+                builder.nativeType(new EsType(BOOLEAN, new HashMap<>()));
+                break;
+            case BYTES:
+                builder.columnType(BINARY);
+                builder.dataType(BINARY);
+                builder.nativeType(new EsType(BINARY, new HashMap<>()));
+                break;
+            case TINYINT:
+                builder.columnType(BYTE);
+                builder.dataType(BYTE);
+                builder.nativeType(new EsType(BYTE, new HashMap<>()));
+                break;
+            case SMALLINT:
+                builder.columnType(SHORT);
+                builder.dataType(SHORT);
+                builder.nativeType(new EsType(SHORT, new HashMap<>()));
+                break;
+            case INT:
+                builder.columnType(INTEGER);
+                builder.dataType(INTEGER);
+                builder.nativeType(new EsType(INTEGER, new HashMap<>()));
+                break;
+            case BIGINT:
+                builder.columnType(LONG);
+                builder.dataType(LONG);
+                builder.nativeType(new EsType(LONG, new HashMap<>()));
+                break;
+            case FLOAT:
+                builder.columnType(FLOAT);
+                builder.dataType(FLOAT);
+                builder.nativeType(new EsType(FLOAT, new HashMap<>()));
+                break;
+            case DOUBLE:
+                builder.columnType(DOUBLE);
+                builder.dataType(DOUBLE);
+                builder.nativeType(new EsType(DOUBLE, new HashMap<>()));
+                break;
+            case DATE:
+            case TIMESTAMP:
+                Map<String, Object> option = new HashMap<>();
+                if (column.getScale() != null && column.getScale() > 3) {
+                    option.put("format", 
"strict_date_optional_time||epoch_millis");
+                    builder.columnType(DATE_NANOS);
+                    builder.dataType(DATE_NANOS);
+                    builder.nativeType(new EsType(DATE_NANOS, option));
+                } else {
+                    option.put("format", 
"strict_date_optional_time_nanos||epoch_millis");
+                    builder.columnType(DATE);
+                    builder.dataType(DATE);
+                    builder.nativeType(new EsType(DATE, option));
+                }
+                break;
+            case DECIMAL:
+                builder.columnType(TEXT);
+                builder.dataType(TEXT);
+                builder.nativeType(new EsType(TEXT, new HashMap<>()));
+                break;
+            case MAP:
+                builder.columnType(FLATTENED);
+                builder.dataType(FLATTENED);
+                builder.nativeType(new EsType(FLATTENED, new HashMap<>()));
+                break;
+            case ARRAY:
+                BasicType type = ((ArrayType) 
column.getDataType()).getElementType();
+                if (type.equals(BasicType.BYTE_TYPE)) {
+                    builder.columnType(BINARY);
+                    builder.dataType(BINARY);
+                    builder.nativeType(new EsType(BINARY, new HashMap<>()));
+                } else if (type.equals(BasicType.SHORT_TYPE)) {
+                    builder.columnType(SHORT);
+                    builder.dataType(SHORT);
+                    builder.nativeType(new EsType(SHORT, new HashMap<>()));
+                } else if (type.equals(BasicType.INT_TYPE)) {
+                    builder.columnType(INTEGER);
+                    builder.dataType(INTEGER);
+                    builder.nativeType(new EsType(INTEGER, new HashMap<>()));
+                } else if (type.equals(BasicType.LONG_TYPE)) {
+                    builder.columnType(LONG);
+                    builder.dataType(LONG);
+                    builder.nativeType(new EsType(LONG, new HashMap<>()));
+                } else if (type.equals(BasicType.FLOAT_TYPE)) {
+                    builder.columnType(FLOAT);
+                    builder.dataType(FLOAT);
+                    builder.nativeType(new EsType(FLOAT, new HashMap<>()));
+                } else if (type.equals(BasicType.DOUBLE_TYPE)) {
+                    builder.columnType(DOUBLE);
+                    builder.dataType(DOUBLE);
+                    builder.nativeType(new EsType(DOUBLE, new HashMap<>()));
+                } else if (type.equals(BasicType.STRING_TYPE)) {
+                    builder.columnType(TEXT);
+                    builder.dataType(TEXT);
+                    builder.nativeType(new EsType(TEXT, new HashMap<>()));
+                } else {
+                    builder.columnType(TEXT);
+                    builder.dataType(TEXT);
+                    builder.nativeType(new EsType(TEXT, new HashMap<>()));
+                }
+                break;
+            case ROW:
+                builder.columnType(OBJECT);
+                builder.dataType(OBJECT);
+                SeaTunnelRowType row = (SeaTunnelRowType) column.getDataType();
+                Map<String, BasicTypeDefine<EsType>> typeInfo = new 
HashMap<>();
+                for (int i = 0; i < row.getFieldNames().length; i++) {
+                    typeInfo.put(
+                            row.getFieldName(i),
+                            reconvert(
+                                    PhysicalColumn.of(
+                                            row.getFieldName(i),
+                                            row.getFieldType(i),
+                                            (Long) null,
+                                            true,
+                                            null,
+                                            null)));
+                }
+                builder.nativeType(new EsType(OBJECT, (Map) typeInfo));
+                break;
+            case TIME:
+            case NULL:
+            case STRING:
+            default:
+                builder.columnType(TEXT);
+                builder.dataType(TEXT);
+                builder.nativeType(new EsType(TEXT, new HashMap<>()));
+        }
+        return builder.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 50c47d1334..18c9b7c109 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -19,10 +19,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
 
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
@@ -34,6 +36,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.Elastic
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.SSLUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpStatus;
 import org.apache.http.auth.AuthScope;
@@ -65,6 +68,13 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.AGGREGATE_METRIC_DOUBLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.ALIAS;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_NANOS;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DENSE_VECTOR;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
+
 @Slf4j
 public class EsRestClient {
 
@@ -174,7 +184,7 @@ public class EsRestClient {
                                             keystorePassword,
                                             truststorePath,
                                             truststorePassword);
-                            sslContext.ifPresent(e -> 
httpClientBuilder.setSSLContext(e));
+                            
sslContext.ifPresent(httpClientBuilder::setSSLContext);
                         } else {
                             SSLContext sslContext =
                                     SSLContexts.custom()
@@ -237,7 +247,7 @@ public class EsRestClient {
                     .clusterVersion(versionNode.get("number").asText())
                     .distribution(
                             
Optional.ofNullable(versionNode.get("distribution"))
-                                    .map(e -> e.asText())
+                                    .map(JsonNode::asText)
                                     .orElse(null))
                     .build();
         } catch (IOException e) {
@@ -276,9 +286,7 @@ public class EsRestClient {
         param.put("sort", new String[] {"_doc"});
         param.put("size", scrollSize);
         String endpoint = "/" + index + "/_search?scroll=" + scrollTime;
-        ScrollResult scrollResult =
-                getDocsFromScrollRequest(endpoint, 
JsonUtils.toJsonString(param));
-        return scrollResult;
+        return getDocsFromScrollRequest(endpoint, 
JsonUtils.toJsonString(param));
     }
 
     /**
@@ -291,9 +299,7 @@ public class EsRestClient {
         Map<String, String> param = new HashMap<>();
         param.put("scroll_id", scrollId);
         param.put("scroll", scrollTime);
-        ScrollResult scrollResult =
-                getDocsFromScrollRequest("/_search/scroll", 
JsonUtils.toJsonString(param));
-        return scrollResult;
+        return getDocsFromScrollRequest("/_search/scroll", 
JsonUtils.toJsonString(param));
     }
 
     private ScrollResult getDocsFromScrollRequest(String endpoint, String 
requestBody) {
@@ -319,8 +325,7 @@ public class EsRestClient {
                                 "POST %s,total shards(%d)!= successful 
shards(%d)",
                                 endpoint, totalShards, successful));
 
-                ScrollResult scrollResult = 
getDocsFromScrollResponse(responseJson);
-                return scrollResult;
+                return getDocsFromScrollResponse(responseJson);
             } else {
                 throw new ElasticsearchConnectorException(
                         ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
@@ -345,13 +350,11 @@ public class EsRestClient {
         List<Map<String, Object>> docs = new ArrayList<>(hitsNode.size());
         scrollResult.setDocs(docs);
 
-        Iterator<JsonNode> iter = hitsNode.iterator();
-        while (iter.hasNext()) {
+        for (JsonNode jsonNode : hitsNode) {
             Map<String, Object> doc = new HashMap<>();
-            JsonNode hitNode = iter.next();
-            doc.put("_index", hitNode.get("_index").textValue());
-            doc.put("_id", hitNode.get("_id").textValue());
-            JsonNode source = hitNode.get("_source");
+            doc.put("_index", jsonNode.get("_index").textValue());
+            doc.put("_id", jsonNode.get("_id").textValue());
+            JsonNode source = jsonNode.get("_source");
             for (Iterator<Map.Entry<String, JsonNode>> iterator = 
source.fields();
                     iterator.hasNext(); ) {
                 Map.Entry<String, JsonNode> entry = iterator.next();
@@ -379,9 +382,7 @@ public class EsRestClient {
             }
             if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                 String entity = EntityUtils.toString(response.getEntity());
-                List<IndexDocsCount> indexDocsCounts =
-                        JsonUtils.toList(entity, IndexDocsCount.class);
-                return indexDocsCounts;
+                return JsonUtils.toList(entity, IndexDocsCount.class);
             } else {
                 throw new ElasticsearchConnectorException(
                         
ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
@@ -423,10 +424,16 @@ public class EsRestClient {
         }
     }
 
-    // todo: We don't support set the index mapping now.
     public void createIndex(String indexName) {
+        createIndex(indexName, null);
+    }
+
+    public void createIndex(String indexName, String mapping) {
         String endpoint = String.format("/%s", indexName);
         Request request = new Request("PUT", endpoint);
+        if (StringUtils.isNotEmpty(mapping)) {
+            request.setJsonEntity(mapping);
+        }
         try {
             Response response = restClient.performRequest(request);
             if (response == null) {
@@ -479,10 +486,11 @@ public class EsRestClient {
      * @param index index name
      * @return {key-> field name,value->es type}
      */
-    public Map<String, String> getFieldTypeMapping(String index, List<String> 
source) {
+    public Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping(
+            String index, List<String> source) {
         String endpoint = String.format("/%s/_mappings", index);
         Request request = new Request("GET", endpoint);
-        Map<String, String> mapping = new HashMap<>();
+        Map<String, BasicTypeDefine<EsType>> mapping = new HashMap<>();
         try {
             Response response = restClient.performRequest(request);
             if (response == null) {
@@ -525,9 +533,9 @@ public class EsRestClient {
         return mapping;
     }
 
-    private static Map<String, String> getFieldTypeMappingFromProperties(
+    private static Map<String, BasicTypeDefine<EsType>> 
getFieldTypeMappingFromProperties(
             JsonNode properties, List<String> source) {
-        Map<String, String> allElasticSearchFieldTypeInfoMap = new HashMap<>();
+        Map<String, BasicTypeDefine<EsType>> allElasticSearchFieldTypeInfoMap 
= new HashMap<>();
         properties
                 .fields()
                 .forEachRemaining(
@@ -535,26 +543,96 @@ public class EsRestClient {
                             String fieldName = entry.getKey();
                             JsonNode fieldProperty = entry.getValue();
                             if (fieldProperty.has("type")) {
-                                allElasticSearchFieldTypeInfoMap.put(
-                                        fieldName, 
fieldProperty.get("type").asText());
+                                String type = 
fieldProperty.get("type").asText();
+                                BasicTypeDefine.BasicTypeDefineBuilder<EsType> 
typeDefine =
+                                        BasicTypeDefine.<EsType>builder()
+                                                .name(fieldName)
+                                                .columnType(type)
+                                                .dataType(type);
+                                if 
(type.equalsIgnoreCase(AGGREGATE_METRIC_DOUBLE)) {
+                                    ArrayNode metrics = ((ArrayNode) 
fieldProperty.get("metrics"));
+                                    List<String> metricsList = new 
ArrayList<>();
+                                    for (JsonNode node : metrics) {
+                                        metricsList.add(node.asText());
+                                    }
+                                    Map<String, Object> options = new 
HashMap<>();
+                                    options.put("metrics", metricsList);
+                                    typeDefine.nativeType(new EsType(type, 
options));
+                                } else if (type.equalsIgnoreCase(ALIAS)) {
+                                    String path = 
fieldProperty.get("path").asText();
+                                    Map<String, Object> options = new 
HashMap<>();
+                                    options.put("path", path);
+                                    typeDefine.nativeType(new EsType(type, 
options));
+                                } else if 
(type.equalsIgnoreCase(DENSE_VECTOR)) {
+                                    String elementType =
+                                            fieldProperty.get("element_type") 
== null
+                                                    ? "float"
+                                                    : 
fieldProperty.get("element_type").asText();
+                                    Map<String, Object> options = new 
HashMap<>();
+                                    options.put("element_type", elementType);
+                                    typeDefine.nativeType(new EsType(type, 
options));
+                                } else if (type.equalsIgnoreCase(DATE)
+                                        || type.equalsIgnoreCase(DATE_NANOS)) {
+                                    String format =
+                                            fieldProperty.get("format") != null
+                                                    ? 
fieldProperty.get("format").asText()
+                                                    : 
"strict_date_optional_time_nanos||epoch_millis";
+                                    Map<String, Object> options = new 
HashMap<>();
+                                    options.put("format", format);
+                                    typeDefine.nativeType(new EsType(type, 
options));
+                                } else {
+                                    typeDefine.nativeType(new EsType(type, new 
HashMap<>()));
+                                }
+                                
allElasticSearchFieldTypeInfoMap.put(fieldName, typeDefine.build());
+                            } else if (fieldProperty.has("properties")) {
+                                // it should be object type
+                                JsonNode propertiesNode = 
fieldProperty.get("properties");
+                                List<String> fields = new ArrayList<>();
+                                
propertiesNode.fieldNames().forEachRemaining(fields::add);
+                                Map<String, BasicTypeDefine<EsType>> 
subFieldTypeInfoMap =
+                                        
getFieldTypeMappingFromProperties(propertiesNode, fields);
+                                BasicTypeDefine.BasicTypeDefineBuilder<EsType> 
typeDefine =
+                                        BasicTypeDefine.<EsType>builder()
+                                                .name(fieldName)
+                                                .columnType(OBJECT)
+                                                .dataType(OBJECT);
+                                typeDefine.nativeType(
+                                        new EsType(OBJECT, (Map) 
subFieldTypeInfoMap));
+                                
allElasticSearchFieldTypeInfoMap.put(fieldName, typeDefine.build());
                             }
                         });
         if (CollectionUtils.isEmpty(source)) {
             return allElasticSearchFieldTypeInfoMap;
         }
 
+        allElasticSearchFieldTypeInfoMap.forEach(
+                (fieldName, fieldType) -> {
+                    if (fieldType.getDataType().equalsIgnoreCase(ALIAS)) {
+                        BasicTypeDefine<EsType> type =
+                                allElasticSearchFieldTypeInfoMap.get(
+                                        
fieldType.getNativeType().getOptions().get("path"));
+                        if (type != null) {
+                            allElasticSearchFieldTypeInfoMap.put(fieldName, 
type);
+                        }
+                    }
+                });
+
         return source.stream()
                 .collect(
                         Collectors.toMap(
                                 Function.identity(),
                                 fieldName -> {
-                                    String fieldType =
+                                    BasicTypeDefine<EsType> fieldType =
                                             
allElasticSearchFieldTypeInfoMap.get(fieldName);
                                     if (fieldType == null) {
                                         log.warn(
                                                 "fail to get elasticsearch 
field {} mapping type,so give a default type text",
                                                 fieldName);
-                                        return "text";
+                                        return 
BasicTypeDefine.<EsType>builder()
+                                                .name(fieldName)
+                                                .columnType("text")
+                                                .dataType("text")
+                                                .build();
                                     }
                                     return fieldType;
                                 }));
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsType.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsType.java
new file mode 100644
index 0000000000..921ed44e98
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsType.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.Map;
+
+@Getter
+@AllArgsConstructor
+public class EsType {
+
+    public static final String AGGREGATE_METRIC_DOUBLE = 
"aggregate_metric_double";
+    public static final String ALIAS = "alias";
+    public static final String BINARY = "binary";
+    public static final String BYTE = "byte";
+    public static final String BOOLEAN = "boolean";
+    public static final String COMPLETION = "completion";
+    public static final String DATE = "date";
+    public static final String DATE_NANOS = "date_nanos";
+    public static final String DENSE_VECTOR = "dense_vector";
+    public static final String DOUBLE = "double";
+    public static final String FLATTENED = "flattened";
+    public static final String FLOAT = "float";
+    public static final String GEO_POINT = "geo_point";
+    public static final String GEO_SHAPE = "geo_shape";
+    public static final String POINT = "point";
+    public static final String INTEGER_RANGE = "integer_range";
+    public static final String FLOAT_RANGE = "float_range";
+    public static final String LONG_RANGE = "long_range";
+    public static final String DOUBLE_RANGE = "double_range";
+    public static final String DATE_RANGE = "date_range";
+    public static final String IP_RANGE = "ip_range";
+    public static final String HALF_FLOAT = "half_float";
+    public static final String SCALED_FLOAT = "scaled_float";
+    public static final String HISTOGRAM = "histogram";
+    public static final String INTEGER = "integer";
+    public static final String IP = "ip";
+    public static final String JOIN = "join";
+    public static final String KEYWORD = "keyword";
+    public static final String LONG = "long";
+    public static final String NESTED = "nested";
+    public static final String OBJECT = "object";
+    public static final String PERCOLATOR = "percolator";
+    public static final String RANK_FEATURE = "rank_feature";
+    public static final String RANK_FEATURES = "rank_features";
+    public static final String SEARCH_AS_YOU_TYPE = "search_as_you_type";
+    public static final String SHORT = "short";
+    public static final String SHAPE = "shape";
+    public static final String STRING = "string";
+    public static final String SPARSE_VECTOR = "sparse_vector";
+    public static final String TEXT = "text";
+    public static final String MATCH_ONLY_TEXT = "match_only_text";
+    public static final String TOKEN_COUNT = "token_count";
+    public static final String UNSIGNED_LONG = "unsigned_long";
+    public static final String VERSION = "version";
+
+    private String type;
+    private Map<String, Object> options;
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
index cb10ed58c0..67226341b5 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -31,8 +31,8 @@ public class IndexInfo {
     private String[] primaryKeys;
     private String keyDelimiter;
 
-    public IndexInfo(ReadonlyConfig config) {
-        index = config.get(SinkConfig.INDEX);
+    public IndexInfo(String index, ReadonlyConfig config) {
+        this.index = index;
         type = config.get(SinkConfig.INDEX_TYPE);
         if (config.getOptional(SinkConfig.PRIMARY_KEYS).isPresent()) {
             primaryKeys = config.get(SinkConfig.PRIMARY_KEYS).toArray(new 
String[0]);
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index 584c373ae7..2f7eb86b91 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -36,6 +36,7 @@ import lombok.NonNull;
 
 import java.time.temporal.Temporal;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
@@ -82,7 +83,7 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
 
     private String serializeUpsert(SeaTunnelRow row) {
         String key = keyExtractor.apply(row);
-        Map<String, Object> document = toDocumentMap(row);
+        Map<String, Object> document = toDocumentMap(row, seaTunnelRowType);
         String documentStr;
 
         try {
@@ -161,22 +162,44 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
                 .toString();
     }
 
-    private Map<String, Object> toDocumentMap(SeaTunnelRow row) {
-        String[] fieldNames = seaTunnelRowType.getFieldNames();
+    private Map<String, Object> toDocumentMap(SeaTunnelRow row, 
SeaTunnelRowType rowType) {
+        String[] fieldNames = rowType.getFieldNames();
         Map<String, Object> doc = new HashMap<>(fieldNames.length);
         Object[] fields = row.getFields();
         for (int i = 0; i < fieldNames.length; i++) {
             Object value = fields[i];
-            if (value instanceof Temporal) {
-                // jackson not support jdk8 new time api
-                doc.put(fieldNames[i], value.toString());
+            if (value == null) {
+            } else if (value instanceof SeaTunnelRow) {
+                doc.put(
+                        fieldNames[i],
+                        toDocumentMap(
+                                (SeaTunnelRow) value, (SeaTunnelRowType) 
rowType.getFieldType(i)));
             } else {
-                doc.put(fieldNames[i], value);
+                doc.put(fieldNames[i], convertValue(value));
             }
         }
         return doc;
     }
 
+    private Object convertValue(Object value) {
+        if (value instanceof Temporal) {
+            // jackson not support jdk8 new time api
+            return value.toString();
+        } else if (value instanceof Map) {
+            for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
+                ((Map) value).put(entry.getKey(), 
convertValue(entry.getValue()));
+            }
+            return value;
+        } else if (value instanceof List) {
+            for (int i = 0; i < ((List) value).size(); i++) {
+                ((List) value).set(i, convertValue(((List) value).get(i)));
+            }
+            return value;
+        } else {
+            return value;
+        }
+    }
+
     private Map<String, String> createMetadata(@NonNull SeaTunnelRow row, 
@NonNull String key) {
         Map<String, String> actionMetadata = createMetadata(row);
         actionMetadata.put("_id", key);
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 2d23b7dc51..f3acd191ef 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -91,6 +91,9 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
                     put(
                             "yyyy-MM-dd HH:mm:ss.SSSSSS".length(),
                             DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSSSSS"));
+                    put(
+                            "yyyy-MM-dd HH:mm:ss.SSSSSSSSS".length(),
+                            DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSSSSSSSS"));
                 }
             };
 
@@ -185,6 +188,25 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
                 convertMap.put(convertKey, convertValue);
             }
             return convertMap;
+        } else if (fieldType instanceof SeaTunnelRowType) {
+            SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
+            Map<String, Object> collect =
+                    mapper.readValue(fieldValue, new TypeReference<Map<String, 
Object>>() {});
+            Object[] seaTunnelFields = new Object[rowType.getTotalFields()];
+            for (int i = 0; i < rowType.getTotalFields(); i++) {
+                String fieldName = rowType.getFieldName(i);
+                SeaTunnelDataType<?> fieldDataType = rowType.getFieldType(i);
+                Object value = collect.get(fieldName);
+                if (value != null) {
+                    seaTunnelFields[i] =
+                            convertValue(
+                                    fieldDataType,
+                                    (value instanceof List || value instanceof 
Map)
+                                            ? mapper.writeValueAsString(value)
+                                            : value.toString());
+                }
+            }
+            return new SeaTunnelRow(seaTunnelFields);
         } else if (fieldType instanceof PrimitiveByteArrayType) {
             return Base64.getDecoder().decode(fieldValue);
         } else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
@@ -204,7 +226,7 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
         } catch (NumberFormatException e) {
             // no op
         }
-        String formatDate = fieldValue.replace("T", " ");
+        String formatDate = fieldValue.replace("T", " ").replace("Z", "");
         if (fieldValue.length() == "yyyyMMdd".length()
                 || fieldValue.length() == "yyyy-MM-dd".length()) {
             formatDate = fieldValue + " 00:00:00";
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index 3d160adc07..d2ca6045eb 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -72,7 +72,7 @@ public class ElasticsearchSink
     public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, 
ElasticsearchSinkState> createWriter(
             SinkWriter.Context context) {
         return new ElasticsearchSinkWriter(
-                context, catalogTable.getSeaTunnelRowType(), config, 
maxBatchSize, maxRetryCount);
+                context, catalogTable, config, maxBatchSize, maxRetryCount);
     }
 
     @Override
@@ -89,7 +89,7 @@ public class ElasticsearchSink
         SchemaSaveMode schemaSaveMode = 
config.get(SinkConfig.SCHEMA_SAVE_MODE);
         DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
 
-        TablePath tablePath = TablePath.of("", config.get(SinkConfig.INDEX));
+        TablePath tablePath = TablePath.of("", 
catalogTable.getTableId().getTableName());
         catalog.open();
         return Optional.of(
                 new DefaultSaveModeHandler(
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index 97548e3fdb..ad2c01e47e 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -26,6 +28,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
@@ -72,6 +75,18 @@ public class ElasticsearchSinkFactory implements 
TableSinkFactory {
 
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
-        return () -> new ElasticsearchSink(context.getOptions(), 
context.getCatalogTable());
+        String original = context.getOptions().get(INDEX);
+        original =
+                original.replace(
+                        REPLACE_TABLE_NAME_KEY,
+                        context.getCatalogTable().getTableId().getTableName());
+        CatalogTable newTable =
+                CatalogTable.of(
+                        TableIdentifier.of(
+                                context.getCatalogTable().getCatalogName(),
+                                
context.getCatalogTable().getTablePath().getDatabaseName(),
+                                original),
+                        context.getCatalogTable());
+        return () -> new ElasticsearchSink(context.getOptions(), newTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 35ed49d498..6edac760c1 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -19,9 +19,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.RetryUtils.RetryMaterial;
@@ -61,18 +61,20 @@ public class ElasticsearchSinkWriter
 
     public ElasticsearchSinkWriter(
             SinkWriter.Context context,
-            SeaTunnelRowType seaTunnelRowType,
+            CatalogTable catalogTable,
             ReadonlyConfig config,
             int maxBatchSize,
             int maxRetryCount) {
         this.context = context;
         this.maxBatchSize = maxBatchSize;
 
-        IndexInfo indexInfo = new IndexInfo(config);
+        IndexInfo indexInfo = new 
IndexInfo(catalogTable.getTableId().getTableName(), config);
         esRestClient = EsRestClient.createInstance(config);
         this.seaTunnelRowSerializer =
                 new ElasticsearchRowSerializer(
-                        esRestClient.getClusterInfo(), indexInfo, 
seaTunnelRowType);
+                        esRestClient.getClusterInfo(),
+                        indexInfo,
+                        catalogTable.getSeaTunnelRowType());
 
         this.requestEsList = new ArrayList<>(maxBatchSize);
         this.retryMaterial =
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index e99e66e420..9909c9bba9 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -30,10 +30,12 @@ import 
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchDataTypeConvertor;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
 
 import java.util.Arrays;
@@ -62,16 +64,14 @@ public class ElasticsearchSource
         } else {
             source = config.get(SourceConfig.SOURCE);
             EsRestClient esRestClient = EsRestClient.createInstance(config);
-            Map<String, String> esFieldType =
+            Map<String, BasicTypeDefine<EsType>> esFieldType =
                     
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
             esRestClient.close();
             SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType[source.size()];
-            ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor =
-                    new ElasticSearchDataTypeConvertor();
             for (int i = 0; i < source.size(); i++) {
-                String esType = esFieldType.get(source.get(i));
+                BasicTypeDefine<EsType> esType = 
esFieldType.get(source.get(i));
                 SeaTunnelDataType<?> seaTunnelDataType =
-                        
elasticSearchDataTypeConvertor.toSeaTunnelType(source.get(i), esType);
+                        
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
                 fieldTypes[i] = seaTunnelDataType;
             }
             TableSchema.Builder builder = TableSchema.builder();
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
index 6efa5bba4c..5a269e0737 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
@@ -49,7 +49,7 @@ public class ElasticsearchRowSerializerTest {
         ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
                 
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
-        IndexInfo indexInfo = new IndexInfo(pluginConf);
+        IndexInfo indexInfo = new IndexInfo(index, pluginConf);
         SeaTunnelRowType schema =
                 new SeaTunnelRowType(
                         new String[] {primaryKey, "name"},
@@ -88,7 +88,7 @@ public class ElasticsearchRowSerializerTest {
         ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
                 
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
-        IndexInfo indexInfo = new IndexInfo(pluginConf);
+        IndexInfo indexInfo = new IndexInfo(index, pluginConf);
         SeaTunnelRowType schema =
                 new SeaTunnelRowType(
                         new String[] {"id", "name"},
@@ -127,7 +127,7 @@ public class ElasticsearchRowSerializerTest {
         ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
                 
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
-        IndexInfo indexInfo = new IndexInfo(pluginConf);
+        IndexInfo indexInfo = new IndexInfo(index, pluginConf);
         SeaTunnelRowType schema =
                 new SeaTunnelRowType(
                         new String[] {primaryKey, "name"},
@@ -165,7 +165,7 @@ public class ElasticsearchRowSerializerTest {
         ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
                 
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
-        IndexInfo indexInfo = new IndexInfo(pluginConf);
+        IndexInfo indexInfo = new IndexInfo(index, pluginConf);
         SeaTunnelRowType schema =
                 new SeaTunnelRowType(
                         new String[] {primaryKey, "name"},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index ddd106451f..0a8d51c7ab 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -26,10 +26,14 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.apache.commons.io.IOUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -48,6 +52,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -78,7 +83,7 @@ public class ElasticsearchIT extends TestSuiteBase implements 
TestResource {
     public void startUp() throws Exception {
         container =
                 new ElasticsearchContainer(
-                                DockerImageName.parse("elasticsearch:8.0.0")
+                                DockerImageName.parse("elasticsearch:8.9.0")
                                         .asCompatibleSubstituteFor(
                                                 
"docker.elastic.co/elasticsearch/elasticsearch"))
                         .withNetwork(NETWORK)
@@ -89,7 +94,7 @@ public class ElasticsearchIT extends TestSuiteBase implements 
TestResource {
                         .withStartupTimeout(Duration.ofMinutes(5))
                         .withLogConsumer(
                                 new Slf4jLogConsumer(
-                                        
DockerLoggerFactory.getLogger("elasticsearch:8.0.0")));
+                                        
DockerLoggerFactory.getLogger("elasticsearch:8.9.0")));
         Startables.deepStart(Stream.of(container)).join();
         log.info("Elasticsearch container started");
         esRestClient =
@@ -105,6 +110,7 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                         Optional.empty());
         testDataset = generateTestDataSet();
         createIndexDocs();
+        createIndexWithFullType();
     }
 
     /** create a index,and bulk some documents */
@@ -125,6 +131,31 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         esRestClient.bulk(requestBody.toString());
     }
 
+    private void createIndexWithFullType() throws IOException, 
InterruptedException {
+        String mapping =
+                IOUtils.toString(
+                        ContainerUtil.getResourcesFile(
+                                        
"/elasticsearch/st_index_full_type_mapping.json")
+                                .toURI(),
+                        StandardCharsets.UTF_8);
+        esRestClient.createIndex("st_index_full_type", mapping);
+        BulkResponse response =
+                esRestClient.bulk(
+                        "{ \"index\" : { \"_index\" : \"st_index_full_type\", 
\"_id\" : \"1\" } }\n"
+                                + IOUtils.toString(
+                                                ContainerUtil.getResourcesFile(
+                                                                
"/elasticsearch/st_index_full_type_data.json")
+                                                        .toURI(),
+                                                StandardCharsets.UTF_8)
+                                        .replace("\n", "")
+                                + "\n");
+        Assertions.assertFalse(response.isErrors(), response.getResponse());
+        // waiting index refresh
+        Thread.sleep(2000L);
+        Assertions.assertEquals(
+                2, 
esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
+    }
+
     @TestTemplate
     public void testElasticsearch(TestContainer container)
             throws IOException, InterruptedException {
@@ -136,6 +167,18 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
+    @TestTemplate
+    public void testElasticsearchWithFullType(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/elasticsearch/elasticsearch_source_and_sink_full_type.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Thread.sleep(2000L);
+        Assertions.assertEquals(
+                1,
+                
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
+    }
+
     private List<String> generateTestDataSet() throws JsonProcessingException {
         String[] fields =
                 new String[] {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_full_type.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_full_type.conf
new file mode 100644
index 0000000000..4c2ca0fae0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_full_type.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  #checkpoint.interval = 10000
+}
+
+source {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+    index = "st_index_full_type"
+    source = [
+      "aggregate_metric_double",
+      "alias",
+      "binary",
+      "byte",
+      "boolean",
+      "completion",
+      "date",
+      "date_nanos",
+      "dense_vector",
+      "double",
+      "flattened",
+      "float",
+      "geo_point",
+      "geo_shape",
+      "point",
+      "integer_range",
+      "float_range",
+      "long_range",
+      "double_range",
+      "date_range",
+      "ip_range",
+      "half_float",
+      "scaled_float",
+      "histogram",
+      "integer",
+      "ip",
+      "join",
+      "keyword",
+      "long",
+      "nested",
+      "object",
+      "percolator",
+      "rank_feature",
+      "rank_features",
+      "shape",
+      "search_as_you_type",
+      "short",
+      "text",
+      "match_only_text",
+      "name",
+      "unsigned_long",
+      "version"
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+    index = "st_index_full_type_target"
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="APPEND_DATA"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_data.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_data.json
new file mode 100644
index 0000000000..ace28fbacb
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_data.json
@@ -0,0 +1,137 @@
+{
+  "aggregate_metric_double": {
+    "min": 10,
+    "max": 100,
+    "sum": 1000,
+    "value_count": 5
+  },
+  "binary": "binary_data",
+  "byte": 127,
+  "boolean": true,
+  "completion": {
+    "input": [
+      "search term",
+      "another term"
+    ]
+  },
+  "date": "2024-03-19",
+  "date_nanos": "2024-03-19T12:30:45.123456789Z",
+  "dense_vector": [
+    1.0,
+    2.0,
+    3.0
+  ],
+  "double": 3.14159,
+  "flattened": {
+    "nested_field1": "value1",
+    "nested_field2": "value2"
+  },
+  "float": 3.14,
+  "geo_point": {
+    "lat": 40.7128,
+    "lon": -74.0060
+  },
+  "geo_shape": {
+    "type": "point",
+    "coordinates": [
+      100.0,
+      0.0
+    ]
+  },
+  "point": {
+    "type": "Point",
+    "coordinates": [
+      100.0,
+      0.0
+    ]
+  },
+  "integer_range": {
+    "gte": 10,
+    "lte": 20
+  },
+  "float_range": {
+    "gte": 1.0,
+    "lte": 5.0
+  },
+  "long_range": {
+    "gte": 100,
+    "lte": 200
+  },
+  "double_range": {
+    "gte": 1.0,
+    "lte": 10.0
+  },
+  "date_range": {
+    "gte": "2024-01-01",
+    "lte": "2024-03-31"
+  },
+  "ip_range": {
+    "gte": "192.0.2.0",
+    "lte": "192.0.2.255"
+  },
+  "half_float": 3.14,
+  "scaled_float": 1.23,
+  "histogram": {
+    "values": [
+      0.1,
+      0.2,
+      0.3,
+      0.4,
+      0.5
+    ],
+    "counts": [
+      3,
+      7,
+      23,
+      12,
+      6
+    ]
+  },
+  "integer": 42,
+  "ip": "192.0.2.1",
+  "join": {
+    "name": "question"
+  },
+  "keyword": "keyword_value",
+  "long": 1234567890,
+  "nested": {
+    "nested_field1": "value1",
+    "nested_field2": "value2"
+  },
+  "object": {
+    "age": 30,
+    "name": {
+      "first": "John",
+      "last": "Doe"
+    }
+  },
+  "percolator": {
+    "match": {
+      "keyword": "keyword_value"
+    }
+  },
+  "rank_feature": 5.0,
+  "rank_features": {
+    "feature1": 10.0,
+    "feature2": 20.0
+  },
+  "shape": "POINT (-377.03653 389.897676)",
+  "search_as_you_type": "searchable text",
+  "short": 32767,
+  "sparse_vector": {
+    "index": [
+      0,
+      2,
+      4
+    ],
+    "values": [
+      1.0,
+      2.0,
+      3.0
+    ]
+  },
+  "text": "full text",
+  "match_only_text": "match only text",
+  "name": "John Doe",
+  "version": "1.0"
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_mapping.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_mapping.json
new file mode 100644
index 0000000000..bf7df0d279
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_full_type_mapping.json
@@ -0,0 +1,162 @@
+{
+  "mappings": {
+    "properties": {
+      "aggregate_metric_double": {
+        "type": "aggregate_metric_double",
+        "metrics": [
+          "min",
+          "max",
+          "sum",
+          "value_count"
+        ],
+        "default_metric": "max"
+      },
+      "alias": {
+        "type": "alias",
+        "path": "aggregate_metric_double"
+      },
+      "binary": {
+        "type": "binary"
+      },
+      "byte": {
+        "type": "byte"
+      },
+      "boolean": {
+        "type": "boolean"
+      },
+      "completion": {
+        "type": "completion"
+      },
+      "date": {
+        "type": "date"
+      },
+      "date_nanos": {
+        "type": "date_nanos"
+      },
+      "dense_vector": {
+        "type": "dense_vector",
+        "dims": 3
+      },
+      "double": {
+        "type": "double"
+      },
+      "flattened": {
+        "type": "flattened"
+      },
+      "float": {
+        "type": "float"
+      },
+      "geo_point": {
+        "type": "geo_point"
+      },
+      "geo_shape": {
+        "type": "geo_shape"
+      },
+      "point": {
+        "type": "point"
+      },
+      "integer_range": {
+        "type": "integer_range"
+      },
+      "float_range": {
+        "type": "float_range"
+      },
+      "long_range": {
+        "type": "long_range"
+      },
+      "double_range": {
+        "type": "double_range"
+      },
+      "date_range": {
+        "type": "date_range"
+      },
+      "ip_range": {
+        "type": "ip_range"
+      },
+      "half_float": {
+        "type": "half_float"
+      },
+      "scaled_float": {
+        "type": "scaled_float",
+        "scaling_factor": 100
+      },
+      "histogram": {
+        "type": "histogram"
+      },
+      "integer": {
+        "type": "integer"
+      },
+      "ip": {
+        "type": "ip"
+      },
+      "join": {
+        "type": "join",
+        "relations": {
+          "question": "answer"
+        }
+      },
+      "keyword": {
+        "type": "keyword"
+      },
+      "long": {
+        "type": "long"
+      },
+      "nested": {
+        "type": "nested"
+      },
+      "object": {
+        "properties": {
+          "age": {
+            "type": "integer"
+          },
+          "name": {
+            "properties": {
+              "first": {
+                "type": "text"
+              },
+              "last": {
+                "type": "text"
+              }
+            }
+          }
+        }
+      },
+      "percolator": {
+        "type": "percolator"
+      },
+      "rank_feature": {
+        "type": "rank_feature"
+      },
+      "rank_features": {
+        "type": "rank_features"
+      },
+      "shape": {
+        "type": "shape"
+      },
+      "search_as_you_type": {
+        "type": "search_as_you_type"
+      },
+      "short": {
+        "type": "short"
+      },
+      "text": {
+        "type": "text"
+      },
+      "match_only_text": {
+        "type": "text"
+      },
+      "name": {
+        "type": "text",
+        "fields": {
+          "length": {
+            "type": "token_count",
+            "analyzer": "standard"
+          }
+        }
+      },
+      "version": {
+        "type": "version"
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to