This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4e98eb8639 [Bug][Improve][Connector-v2][ElasticsearchSource] Fix 
behavior when source empty,Support SourceConfig.SOURCE field empty. (#6425)
4e98eb8639 is described below

commit 4e98eb8639d6ca91537387c1bb1693de686d54d2
Author: CosmosNi <[email protected]>
AuthorDate: Mon Apr 29 09:42:12 2024 +0800

    [Bug][Improve][Connector-v2][ElasticsearchSource] Fix behavior when source 
empty,Support SourceConfig.SOURCE field empty. (#6425)
---
 docs/en/connector-v2/source/Elasticsearch.md       | 14 ++--
 .../elasticsearch/config/SourceConfig.java         |  7 ++
 .../source/DefaultSeaTunnelRowDeserializer.java    |  5 +-
 .../elasticsearch/source/ElasticsearchSource.java  | 47 +++++++++++--
 .../elasticsearch/ElasticsearchSourceTest.java     | 49 +++++++++++++
 .../connector/elasticsearch/ElasticsearchIT.java   | 82 ++++++++++++++++++++--
 .../elasticsearch_source_and_sink.conf             |  4 +-
 ...sticsearch_source_without_schema_and_sink.conf} | 30 +++-----
 .../st_index_source_without_schema_and_sink.json   | 64 +++++++++++++++++
 9 files changed, 259 insertions(+), 43 deletions(-)

diff --git a/docs/en/connector-v2/source/Elasticsearch.md 
b/docs/en/connector-v2/source/Elasticsearch.md
index 461787e6f6..62ddfc5487 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -29,9 +29,9 @@ support version >= 2.x and <= 8.x.
 | query                   | json    | no       | {"match_all": {}} |
 | scroll_time             | string  | no       | 1m                |
 | scroll_size             | int     | no       | 100               |
-| schema                  |         | no       | -                 |
 | tls_verify_certificate  | boolean | no       | true              |
 | tls_verify_hostnames    | boolean | no       | true              |
+| array_column            | map     | no       |                   |
 | tls_keystore_path       | string  | no       | -                 |
 | tls_keystore_password   | string  | no       | -                 |
 | tls_truststore_path     | string  | no       | -                 |
@@ -58,7 +58,12 @@ Elasticsearch index name, support * fuzzy matching.
 
 The fields of index.
 You can get the document id by specifying the field `_id`.If sink _id to other 
index,you need specify an alias for _id due to the Elasticsearch limit.
-If you don't config source, you must config `schema`.
+If you don't config source, it is automatically retrieved from the mapping of 
the index.
+
+### array_column [array]
+
+The fields of array type.
+Since there is no array index in es,so need assign array type,just like 
`{c_array = "array<tinyint>"}`.
 
 ### query [json]
 
@@ -73,11 +78,6 @@ Amount of time Elasticsearch will keep the search context 
alive for scroll reque
 
 Maximum number of hits to be returned with each Elasticsearch scroll request.
 
-### schema
-
-The structure of the data, including field names and field types.
-If you don't config schema, you must config `source`.
-
 ### tls_verify_certificate [boolean]
 
 Enable certificates validation for HTTPS endpoints
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
index 81c32bdf15..c63cd37595 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
@@ -40,6 +40,13 @@ public class SourceConfig {
                     .withDescription(
                             "The fields of index. You can get the document id 
by specifying the field _id.If sink _id to other index,you need specify an 
alias for _id due to the Elasticsearch limit");
 
+    public static final Option<Map<String, String>> ARRAY_COLUMN =
+            Options.key("array_column")
+                    .mapType()
+                    .defaultValue(new HashMap<>())
+                    .withDescription(
+                            "Because there is no array type in es,so need 
specify array Type.");
+
     public static final Option<String> SCROLL_TIME =
             Options.key("scroll_time")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index f3acd191ef..aab2db822a 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.NullNode;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
 
@@ -117,7 +118,9 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
                 value = recursiveGet(rowRecord.getDoc(), fieldName);
                 if (value != null) {
                     seaTunnelDataType = rowTypeInfo.getFieldType(i);
-                    if (value instanceof TextNode) {
+                    if (value instanceof NullNode) {
+                        seaTunnelFields[i] = null;
+                    } else if (value instanceof TextNode) {
                         seaTunnelFields[i] =
                                 convertValue(seaTunnelDataType, ((TextNode) 
value).textValue());
                     } else {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index 9909c9bba9..7b153f0be3 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -27,6 +29,7 @@ import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
@@ -38,11 +41,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClie
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
 
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+@Slf4j
 public class ElasticsearchSource
         implements SeaTunnelSource<
                         SeaTunnelRow, ElasticsearchSourceSplit, 
ElasticsearchSourceState>,
@@ -55,27 +64,40 @@ public class ElasticsearchSource
 
     private List<String> source;
 
+    private Map<String, String> arrayColumn;
+
     public ElasticsearchSource(ReadonlyConfig config) {
         this.config = config;
         if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
             // todo: We need to remove the schema in ES.
+            log.warn(
+                    "The schema config in ElasticSearch sink is deprecated, 
please use source config instead!");
             catalogTable = CatalogTableUtil.buildWithConfig(config);
             source = 
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
         } else {
             source = config.get(SourceConfig.SOURCE);
+            arrayColumn = config.get(SourceConfig.ARRAY_COLUMN);
             EsRestClient esRestClient = EsRestClient.createInstance(config);
             Map<String, BasicTypeDefine<EsType>> esFieldType =
                     
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
             esRestClient.close();
-            SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType[source.size()];
-            for (int i = 0; i < source.size(); i++) {
-                BasicTypeDefine<EsType> esType = 
esFieldType.get(source.get(i));
-                SeaTunnelDataType<?> seaTunnelDataType =
-                        
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
-                fieldTypes[i] = seaTunnelDataType;
+
+            if (CollectionUtils.isEmpty(source)) {
+                source = new ArrayList<>(esFieldType.keySet());
             }
+            SeaTunnelDataType[] fieldTypes = getSeaTunnelDataType(esFieldType, 
source);
             TableSchema.Builder builder = TableSchema.builder();
+
             for (int i = 0; i < source.size(); i++) {
+                String key = source.get(i);
+                if (arrayColumn.containsKey(key)) {
+                    String value = arrayColumn.get(key);
+                    SeaTunnelDataType<?> dataType =
+                            
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value);
+                    builder.column(PhysicalColumn.of(key, dataType, 0, true, 
null, null));
+                    continue;
+                }
+
                 builder.column(
                         PhysicalColumn.of(source.get(i), fieldTypes[i], 0, 
true, null, null));
             }
@@ -127,4 +149,17 @@ public class ElasticsearchSource
         return new ElasticsearchSourceSplitEnumerator(
                 enumeratorContext, sourceState, config, source);
     }
+
+    @VisibleForTesting
+    public static SeaTunnelDataType[] getSeaTunnelDataType(
+            Map<String, BasicTypeDefine<EsType>> esFieldType, List<String> 
source) {
+        SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType[source.size()];
+        for (int i = 0; i < source.size(); i++) {
+            BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
+            SeaTunnelDataType<?> seaTunnelDataType =
+                    
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
+            fieldTypes[i] = seaTunnelDataType;
+        }
+        return fieldTypes;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/ElasticsearchSourceTest.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/ElasticsearchSourceTest.java
new file mode 100644
index 0000000000..9c92fe6039
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/ElasticsearchSourceTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSource;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ElasticsearchSourceTest {
+    @Test
+    public void testPrepareWithEmptySource() throws PrepareFailException {
+        BasicTypeDefine.BasicTypeDefineBuilder<EsType> typeDefine =
+                BasicTypeDefine.<EsType>builder()
+                        .name("field1")
+                        .columnType("text")
+                        .dataType("text");
+        Map<String, BasicTypeDefine<EsType>> esFieldType = new HashMap<>();
+        esFieldType.put("field1", typeDefine.build());
+        SeaTunnelDataType[] seaTunnelDataTypes =
+                ElasticsearchSource.getSeaTunnelDataType(
+                        esFieldType, new ArrayList<>(esFieldType.keySet()));
+        Assertions.assertNotNull(seaTunnelDataTypes);
+        Assertions.assertEquals(seaTunnelDataTypes[0].getTypeClass(), 
String.class);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 0a8d51c7ab..b754ea425a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -23,9 +23,11 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import org.apache.seatunnel.e2e.common.TestResource;
@@ -57,6 +59,7 @@ import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -111,6 +114,7 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         testDataset = generateTestDataSet();
         createIndexDocs();
         createIndexWithFullType();
+        createIndexForResourceNull();
     }
 
     /** create a index,and bulk some documents */
@@ -156,6 +160,16 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                 2, 
esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
     }
 
+    private void createIndexForResourceNull() throws IOException {
+        String mapping =
+                IOUtils.toString(
+                        ContainerUtil.getResourcesFile(
+                                        
"/elasticsearch/st_index_source_without_schema_and_sink.json")
+                                .toURI(),
+                        StandardCharsets.UTF_8);
+        esRestClient.createIndex("st_index4", mapping);
+    }
+
     @TestTemplate
     public void testElasticsearch(TestContainer container)
             throws IOException, InterruptedException {
@@ -179,6 +193,19 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                 
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
     }
 
+    @TestTemplate
+    public void testElasticsearchWithoutSchema(TestContainer container)
+            throws IOException, InterruptedException {
+
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/elasticsearch/elasticsearch_source_without_schema_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> sinkData = readSinkDataWithOutSchema();
+        // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
+        Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
+    }
+
     private List<String> generateTestDataSet() throws JsonProcessingException {
         String[] fields =
                 new String[] {
@@ -188,12 +215,12 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                     "c_boolean",
                     "c_tinyint",
                     "c_smallint",
-                    "c_int",
                     "c_bigint",
                     "c_float",
                     "c_double",
                     "c_decimal",
                     "c_bytes",
+                    "c_int",
                     "c_date",
                     "c_timestamp"
                 };
@@ -227,6 +254,14 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         return documents;
     }
 
+    private List<String> readSinkDataWithOutSchema() throws 
InterruptedException {
+        Map<String, BasicTypeDefine<EsType>> esFieldType =
+                esRestClient.getFieldTypeMapping("st_index4", 
Lists.newArrayList());
+        Thread.sleep(2000);
+        List<String> source = new ArrayList<>(esFieldType.keySet());
+        return getDocsWithTransformDate(source, "st_index4");
+    }
+
     private List<String> readSinkData() throws InterruptedException {
         // wait for index refresh
         Thread.sleep(2000);
@@ -238,14 +273,18 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                         "c_boolean",
                         "c_tinyint",
                         "c_smallint",
-                        "c_int",
                         "c_bigint",
                         "c_float",
                         "c_double",
                         "c_decimal",
                         "c_bytes",
+                        "c_int",
                         "c_date",
                         "c_timestamp");
+        return getDocsWithTransformTimestamp(source, "st_index2");
+    }
+
+    private List<String> getDocsWithTransformTimestamp(List<String> source, 
String index) {
         HashMap<String, Object> rangeParam = new HashMap<>();
         rangeParam.put("gte", 10);
         rangeParam.put("lte", 20);
@@ -253,8 +292,7 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         range.put("c_int", rangeParam);
         Map<String, Object> query = new HashMap<>();
         query.put("range", range);
-        ScrollResult scrollResult =
-                esRestClient.searchByScroll("st_index2", source, query, "1m", 
1000);
+        ScrollResult scrollResult = esRestClient.searchByScroll(index, source, 
query, "1m", 1000);
         scrollResult
                 .getDocs()
                 .forEach(
@@ -262,8 +300,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                             x.remove("_index");
                             x.remove("_type");
                             x.remove("_id");
-                            // I don’t know if converting the test cases in 
this way complies with
-                            // the CI specification
                             x.replace(
                                     "c_timestamp",
                                     
LocalDateTime.parse(x.get("c_timestamp").toString())
@@ -280,6 +316,40 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         return docs;
     }
 
+    private List<String> getDocsWithTransformDate(List<String> source, String 
index) {
+        HashMap<String, Object> rangeParam = new HashMap<>();
+        rangeParam.put("gte", 10);
+        rangeParam.put("lte", 20);
+        HashMap<String, Object> range = new HashMap<>();
+        range.put("c_int", rangeParam);
+        Map<String, Object> query = new HashMap<>();
+        query.put("range", range);
+        ScrollResult scrollResult = esRestClient.searchByScroll(index, source, 
query, "1m", 1000);
+        scrollResult
+                .getDocs()
+                .forEach(
+                        x -> {
+                            x.remove("_index");
+                            x.remove("_type");
+                            x.remove("_id");
+                            x.replace(
+                                    "c_date",
+                                    LocalDate.parse(
+                                                    x.get("c_date").toString(),
+                                                    
DateTimeFormatter.ofPattern(
+                                                            
"yyyy-MM-dd'T'HH:mm"))
+                                            .toString());
+                        });
+        List<String> docs =
+                scrollResult.getDocs().stream()
+                        .sorted(
+                                Comparator.comparingInt(
+                                        o -> 
Integer.valueOf(o.get("c_int").toString())))
+                        .map(JsonUtils::toJsonString)
+                        .collect(Collectors.toList());
+        return docs;
+    }
+
     private List<String> mapTestDatasetForDSL() {
         return testDataset.stream()
                 .map(JsonUtils::parseObject)
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
index c4f0bb91ff..c6c668e469 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
@@ -43,12 +43,12 @@ source {
         c_boolean = boolean
         c_tinyint = tinyint
         c_smallint = smallint
-        c_int = int
         c_bigint = bigint
         c_float = float
         c_double = double
         c_decimal = "decimal(2, 1)"
         c_bytes = bytes
+        c_int = int
         c_date = date
         c_timestamp = timestamp
       }
@@ -72,4 +72,4 @@ sink {
     "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
     "data_save_mode"="APPEND_DATA"
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_without_schema_and_sink.conf
similarity index 77%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_without_schema_and_sink.conf
index c4f0bb91ff..94e737c661 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_without_schema_and_sink.conf
@@ -32,27 +32,15 @@ source {
     password = "elasticsearch"
     tls_verify_certificate = false
     tls_verify_hostname = false
-
     index = "st_index"
+       source = []
+       array_column = {
+           c_array = "array<tinyint>"
+       }
     query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
-    schema = {
-      fields {
-        c_map = "map<string, tinyint>"
-        c_array = "array<tinyint>"
-        c_string = string
-        c_boolean = boolean
-        c_tinyint = tinyint
-        c_smallint = smallint
-        c_int = int
-        c_bigint = bigint
-        c_float = float
-        c_double = double
-        c_decimal = "decimal(2, 1)"
-        c_bytes = bytes
-        c_date = date
-        c_timestamp = timestamp
-      }
-    }
+    es.mapping.date.rich = "false"
+    es.read.field.exclude = "reqparams.header"
+    es.read.field.as.array.include = "c_array"
   }
 }
 
@@ -67,9 +55,9 @@ sink {
     tls_verify_certificate = false
     tls_verify_hostname = false
 
-    index = "st_index2"
+    index = "st_index4"
     index_type = "st"
     "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
     "data_save_mode"="APPEND_DATA"
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
new file mode 100644
index 0000000000..fc1adb3513
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
@@ -0,0 +1,64 @@
+{
+        "mappings": {
+            "properties": {
+                "c_array": {
+                    "type": "long"
+                },
+                "c_bigint": {
+                    "type": "long"
+                },
+                "c_boolean": {
+                    "type": "boolean"
+                },
+                "c_bytes": {
+                    "type": "text",
+                    "fields": {
+                        "keyword": {
+                            "type": "keyword",
+                            "ignore_above": 256
+                        }
+                    }
+                },
+                "c_date": {
+                    "type": "date"
+                },
+                "c_decimal": {
+                    "type": "float"
+                },
+                "c_double": {
+                    "type": "float"
+                },
+                "c_float": {
+                    "type": "float"
+                },
+                "c_int": {
+                    "type": "long"
+                },
+                "c_map": {
+                    "properties": {
+                        "key": {
+                            "type": "long"
+                        }
+                    }
+                },
+                "c_smallint": {
+                    "type": "long"
+                },
+                "c_string": {
+                    "type": "text",
+                    "fields": {
+                        "keyword": {
+                            "type": "keyword",
+                            "ignore_above": 256
+                        }
+                    }
+                },
+                "c_timestamp": {
+                    "type": "long"
+                },
+                "c_tinyint": {
+                    "type": "long"
+                }
+            }
+        }
+    }

Reply via email to