This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4e98eb8639 [Bug][Improve][Connector-v2][ElasticsearchSource] Fix
behavior when source empty,Support SourceConfig.SOURCE field empty. (#6425)
4e98eb8639 is described below
commit 4e98eb8639d6ca91537387c1bb1693de686d54d2
Author: CosmosNi <[email protected]>
AuthorDate: Mon Apr 29 09:42:12 2024 +0800
[Bug][Improve][Connector-v2][ElasticsearchSource] Fix behavior when source
empty,Support SourceConfig.SOURCE field empty. (#6425)
---
docs/en/connector-v2/source/Elasticsearch.md | 14 ++--
.../elasticsearch/config/SourceConfig.java | 7 ++
.../source/DefaultSeaTunnelRowDeserializer.java | 5 +-
.../elasticsearch/source/ElasticsearchSource.java | 47 +++++++++++--
.../elasticsearch/ElasticsearchSourceTest.java | 49 +++++++++++++
.../connector/elasticsearch/ElasticsearchIT.java | 82 ++++++++++++++++++++--
.../elasticsearch_source_and_sink.conf | 4 +-
...sticsearch_source_without_schema_and_sink.conf} | 30 +++-----
.../st_index_source_without_schema_and_sink.json | 64 +++++++++++++++++
9 files changed, 259 insertions(+), 43 deletions(-)
diff --git a/docs/en/connector-v2/source/Elasticsearch.md
b/docs/en/connector-v2/source/Elasticsearch.md
index 461787e6f6..62ddfc5487 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -29,9 +29,9 @@ support version >= 2.x and <= 8.x.
| query | json | no | {"match_all": {}} |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |
-| schema | | no | - |
| tls_verify_certificate | boolean | no | true |
| tls_verify_hostnames | boolean | no | true |
+| array_column | map | no | |
| tls_keystore_path | string | no | - |
| tls_keystore_password | string | no | - |
| tls_truststore_path | string | no | - |
@@ -58,7 +58,12 @@ Elasticsearch index name, support * fuzzy matching.
The fields of index.
You can get the document id by specifying the field `_id`.If sink _id to other
index,you need specify an alias for _id due to the Elasticsearch limit.
-If you don't config source, you must config `schema`.
+If you don't config source, it is automatically retrieved from the mapping of
the index.
+
+### array_column [array]
+
+The fields of array type.
+Since there is no array index in es,so need assign array type,just like
`{c_array = "array<tinyint>"}`.
### query [json]
@@ -73,11 +78,6 @@ Amount of time Elasticsearch will keep the search context
alive for scroll reque
Maximum number of hits to be returned with each Elasticsearch scroll request.
-### schema
-
-The structure of the data, including field names and field types.
-If you don't config schema, you must config `source`.
-
### tls_verify_certificate [boolean]
Enable certificates validation for HTTPS endpoints
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
index 81c32bdf15..c63cd37595 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
@@ -40,6 +40,13 @@ public class SourceConfig {
.withDescription(
"The fields of index. You can get the document id
by specifying the field _id.If sink _id to other index,you need specify an
alias for _id due to the Elasticsearch limit");
+ public static final Option<Map<String, String>> ARRAY_COLUMN =
+ Options.key("array_column")
+ .mapType()
+ .defaultValue(new HashMap<>())
+ .withDescription(
+ "Because there is no array type in es,so need
specify array Type.");
+
public static final Option<String> SCROLL_TIME =
Options.key("scroll_time")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index f3acd191ef..aab2db822a 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.NullNode;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
@@ -117,7 +118,9 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
value = recursiveGet(rowRecord.getDoc(), fieldName);
if (value != null) {
seaTunnelDataType = rowTypeInfo.getFieldType(i);
- if (value instanceof TextNode) {
+ if (value instanceof NullNode) {
+ seaTunnelFields[i] = null;
+ } else if (value instanceof TextNode) {
seaTunnelFields[i] =
convertValue(seaTunnelDataType, ((TextNode)
value).textValue());
} else {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index 9909c9bba9..7b153f0be3 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -27,6 +29,7 @@ import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
@@ -38,11 +41,17 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClie
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+@Slf4j
public class ElasticsearchSource
implements SeaTunnelSource<
SeaTunnelRow, ElasticsearchSourceSplit,
ElasticsearchSourceState>,
@@ -55,27 +64,40 @@ public class ElasticsearchSource
private List<String> source;
+ private Map<String, String> arrayColumn;
+
public ElasticsearchSource(ReadonlyConfig config) {
this.config = config;
if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
// todo: We need to remove the schema in ES.
+ log.warn(
+ "The schema config in ElasticSearch sink is deprecated,
please use source config instead!");
catalogTable = CatalogTableUtil.buildWithConfig(config);
source =
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
} else {
source = config.get(SourceConfig.SOURCE);
+ arrayColumn = config.get(SourceConfig.ARRAY_COLUMN);
EsRestClient esRestClient = EsRestClient.createInstance(config);
Map<String, BasicTypeDefine<EsType>> esFieldType =
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
esRestClient.close();
- SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType[source.size()];
- for (int i = 0; i < source.size(); i++) {
- BasicTypeDefine<EsType> esType =
esFieldType.get(source.get(i));
- SeaTunnelDataType<?> seaTunnelDataType =
-
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
- fieldTypes[i] = seaTunnelDataType;
+
+ if (CollectionUtils.isEmpty(source)) {
+ source = new ArrayList<>(esFieldType.keySet());
}
+ SeaTunnelDataType[] fieldTypes = getSeaTunnelDataType(esFieldType,
source);
TableSchema.Builder builder = TableSchema.builder();
+
for (int i = 0; i < source.size(); i++) {
+ String key = source.get(i);
+ if (arrayColumn.containsKey(key)) {
+ String value = arrayColumn.get(key);
+ SeaTunnelDataType<?> dataType =
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value);
+ builder.column(PhysicalColumn.of(key, dataType, 0, true,
null, null));
+ continue;
+ }
+
builder.column(
PhysicalColumn.of(source.get(i), fieldTypes[i], 0,
true, null, null));
}
@@ -127,4 +149,17 @@ public class ElasticsearchSource
return new ElasticsearchSourceSplitEnumerator(
enumeratorContext, sourceState, config, source);
}
+
+ @VisibleForTesting
+ public static SeaTunnelDataType[] getSeaTunnelDataType(
+ Map<String, BasicTypeDefine<EsType>> esFieldType, List<String>
source) {
+ SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType[source.size()];
+ for (int i = 0; i < source.size(); i++) {
+ BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
+ SeaTunnelDataType<?> seaTunnelDataType =
+
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
+ fieldTypes[i] = seaTunnelDataType;
+ }
+ return fieldTypes;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/ElasticsearchSourceTest.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/ElasticsearchSourceTest.java
new file mode 100644
index 0000000000..9c92fe6039
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/ElasticsearchSourceTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSource;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ElasticsearchSourceTest {
+ @Test
+ public void testPrepareWithEmptySource() throws PrepareFailException {
+ BasicTypeDefine.BasicTypeDefineBuilder<EsType> typeDefine =
+ BasicTypeDefine.<EsType>builder()
+ .name("field1")
+ .columnType("text")
+ .dataType("text");
+ Map<String, BasicTypeDefine<EsType>> esFieldType = new HashMap<>();
+ esFieldType.put("field1", typeDefine.build());
+ SeaTunnelDataType[] seaTunnelDataTypes =
+ ElasticsearchSource.getSeaTunnelDataType(
+ esFieldType, new ArrayList<>(esFieldType.keySet()));
+ Assertions.assertNotNull(seaTunnelDataTypes);
+ Assertions.assertEquals(seaTunnelDataTypes[0].getTypeClass(),
String.class);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 0a8d51c7ab..b754ea425a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -23,9 +23,11 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
+import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
@@ -57,6 +59,7 @@ import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -111,6 +114,7 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
testDataset = generateTestDataSet();
createIndexDocs();
createIndexWithFullType();
+ createIndexForResourceNull();
}
/** create a index,and bulk some documents */
@@ -156,6 +160,16 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
2,
esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
}
+ private void createIndexForResourceNull() throws IOException {
+ String mapping =
+ IOUtils.toString(
+ ContainerUtil.getResourcesFile(
+
"/elasticsearch/st_index_source_without_schema_and_sink.json")
+ .toURI(),
+ StandardCharsets.UTF_8);
+ esRestClient.createIndex("st_index4", mapping);
+ }
+
@TestTemplate
public void testElasticsearch(TestContainer container)
throws IOException, InterruptedException {
@@ -179,6 +193,19 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
}
+ @TestTemplate
+ public void testElasticsearchWithoutSchema(TestContainer container)
+ throws IOException, InterruptedException {
+
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/elasticsearch/elasticsearch_source_without_schema_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> sinkData = readSinkDataWithOutSchema();
+ // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
+ Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
+ }
+
private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
@@ -188,12 +215,12 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
"c_boolean",
"c_tinyint",
"c_smallint",
- "c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_bytes",
+ "c_int",
"c_date",
"c_timestamp"
};
@@ -227,6 +254,14 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
return documents;
}
+ private List<String> readSinkDataWithOutSchema() throws
InterruptedException {
+ Map<String, BasicTypeDefine<EsType>> esFieldType =
+ esRestClient.getFieldTypeMapping("st_index4",
Lists.newArrayList());
+ Thread.sleep(2000);
+ List<String> source = new ArrayList<>(esFieldType.keySet());
+ return getDocsWithTransformDate(source, "st_index4");
+ }
+
private List<String> readSinkData() throws InterruptedException {
// wait for index refresh
Thread.sleep(2000);
@@ -238,14 +273,18 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
"c_boolean",
"c_tinyint",
"c_smallint",
- "c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_bytes",
+ "c_int",
"c_date",
"c_timestamp");
+ return getDocsWithTransformTimestamp(source, "st_index2");
+ }
+
+ private List<String> getDocsWithTransformTimestamp(List<String> source,
String index) {
HashMap<String, Object> rangeParam = new HashMap<>();
rangeParam.put("gte", 10);
rangeParam.put("lte", 20);
@@ -253,8 +292,7 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
range.put("c_int", rangeParam);
Map<String, Object> query = new HashMap<>();
query.put("range", range);
- ScrollResult scrollResult =
- esRestClient.searchByScroll("st_index2", source, query, "1m",
1000);
+ ScrollResult scrollResult = esRestClient.searchByScroll(index, source,
query, "1m", 1000);
scrollResult
.getDocs()
.forEach(
@@ -262,8 +300,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
x.remove("_index");
x.remove("_type");
x.remove("_id");
- // I don’t know if converting the test cases in
this way complies with
- // the CI specification
x.replace(
"c_timestamp",
LocalDateTime.parse(x.get("c_timestamp").toString())
@@ -280,6 +316,40 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
return docs;
}
+ private List<String> getDocsWithTransformDate(List<String> source, String
index) {
+ HashMap<String, Object> rangeParam = new HashMap<>();
+ rangeParam.put("gte", 10);
+ rangeParam.put("lte", 20);
+ HashMap<String, Object> range = new HashMap<>();
+ range.put("c_int", rangeParam);
+ Map<String, Object> query = new HashMap<>();
+ query.put("range", range);
+ ScrollResult scrollResult = esRestClient.searchByScroll(index, source,
query, "1m", 1000);
+ scrollResult
+ .getDocs()
+ .forEach(
+ x -> {
+ x.remove("_index");
+ x.remove("_type");
+ x.remove("_id");
+ x.replace(
+ "c_date",
+ LocalDate.parse(
+ x.get("c_date").toString(),
+
DateTimeFormatter.ofPattern(
+
"yyyy-MM-dd'T'HH:mm"))
+ .toString());
+ });
+ List<String> docs =
+ scrollResult.getDocs().stream()
+ .sorted(
+ Comparator.comparingInt(
+ o ->
Integer.valueOf(o.get("c_int").toString())))
+ .map(JsonUtils::toJsonString)
+ .collect(Collectors.toList());
+ return docs;
+ }
+
private List<String> mapTestDatasetForDSL() {
return testDataset.stream()
.map(JsonUtils::parseObject)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
index c4f0bb91ff..c6c668e469 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
@@ -43,12 +43,12 @@ source {
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
- c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
+ c_int = int
c_date = date
c_timestamp = timestamp
}
@@ -72,4 +72,4 @@ sink {
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode"="APPEND_DATA"
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_without_schema_and_sink.conf
similarity index 77%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_without_schema_and_sink.conf
index c4f0bb91ff..94e737c661 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_without_schema_and_sink.conf
@@ -32,27 +32,15 @@ source {
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
-
index = "st_index"
+ source = []
+ array_column = {
+ c_array = "array<tinyint>"
+ }
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
- schema = {
- fields {
- c_map = "map<string, tinyint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ es.mapping.date.rich = "false"
+ es.read.field.exclude = "reqparams.header"
+ es.read.field.as.array.include = "c_array"
}
}
@@ -67,9 +55,9 @@ sink {
tls_verify_certificate = false
tls_verify_hostname = false
- index = "st_index2"
+ index = "st_index4"
index_type = "st"
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode"="APPEND_DATA"
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
new file mode 100644
index 0000000000..fc1adb3513
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
@@ -0,0 +1,64 @@
+{
+ "mappings": {
+ "properties": {
+ "c_array": {
+ "type": "long"
+ },
+ "c_bigint": {
+ "type": "long"
+ },
+ "c_boolean": {
+ "type": "boolean"
+ },
+ "c_bytes": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "c_date": {
+ "type": "date"
+ },
+ "c_decimal": {
+ "type": "float"
+ },
+ "c_double": {
+ "type": "float"
+ },
+ "c_float": {
+ "type": "float"
+ },
+ "c_int": {
+ "type": "long"
+ },
+ "c_map": {
+ "properties": {
+ "key": {
+ "type": "long"
+ }
+ }
+ },
+ "c_smallint": {
+ "type": "long"
+ },
+ "c_string": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "c_timestamp": {
+ "type": "long"
+ },
+ "c_tinyint": {
+ "type": "long"
+ }
+ }
+ }
+ }