This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new eaa15e4c8d [Feature][connector-elasticsearch] elasticsearch support
nested type (#8462)
eaa15e4c8d is described below
commit eaa15e4c8d72c8e002c4e6b663776ede4e2505b8
Author: CosmosNi <[email protected]>
AuthorDate: Tue Jan 7 18:58:32 2025 +0800
[Feature][connector-elasticsearch] elasticsearch support nested type (#8462)
---
.../apache/seatunnel/api/table/type/ArrayType.java | 2 +-
.../seatunnel/api/table/type/SeaTunnelRow.java | 10 +++
.../catalog/ElasticSearchTypeConverter.java | 8 ++-
.../source/DefaultSeaTunnelRowDeserializer.java | 30 +++++++++
.../connector/elasticsearch/ElasticsearchIT.java | 74 ++++++++++++++++++++++
.../elasticsearch_source_and_sink_with_nest.conf | 53 ++++++++++++++++
.../elasticsearch/st_index_nest_data.json | 15 +++++
.../elasticsearch/st_index_nest_mapping.json | 23 +++++++
8 files changed, 212 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
index 36c3362108..65f7651e79 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -56,7 +56,7 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T> {
private final Class<T> arrayClass;
private final SeaTunnelDataType<E> elementType;
- protected ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType)
{
+ public ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
this.arrayClass = arrayClass;
this.elementType = elementType;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 39f61aee5d..5c253b4cf5 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -305,6 +305,16 @@ public final class SeaTunnelRow implements Serializable {
return getBytesForArray(v, BasicType.FLOAT_TYPE);
case "Double[]":
return getBytesForArray(v, BasicType.DOUBLE_TYPE);
+ case "Map[]":
+ int sizeMaps = 0;
+ for (Map o : (Map[]) v) {
+ for (Map.Entry<?, ?> entry : ((Map<?, ?>) o).entrySet()) {
+ sizeMaps +=
+ getBytesForValue(entry.getKey())
+ + getBytesForValue(entry.getValue());
+ }
+ }
+ return sizeMaps;
case "HashMap":
case "LinkedHashMap":
int size = 0;
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
index 412342cb82..d92d083998 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
@@ -64,7 +64,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsT
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT;
@@ -150,6 +149,12 @@ public class ElasticSearchTypeConverter implements
BasicTypeConverter<BasicTypeD
});
builder.dataType(rowType);
break;
+ case EsType.NESTED:
+ builder.dataType(
+ new ArrayType<>(
+ Map[].class,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE)));
+ break;
case INTEGER:
case TOKEN_COUNT:
builder.dataType(BasicType.INT_TYPE);
@@ -207,7 +212,6 @@ public class ElasticSearchTypeConverter implements
BasicTypeConverter<BasicTypeD
case COMPLETION:
case STRING:
case GEO_SHAPE:
- case NESTED:
case PERCOLATOR:
case POINT:
case RANK_FEATURES:
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index fd176f2f03..f39aeae8fd 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -46,6 +46,7 @@ import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
@@ -177,6 +178,35 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
} else if (fieldType instanceof ArrayType) {
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
SeaTunnelDataType<?> elementType = arrayType.getElementType();
+ if (elementType instanceof MapType) {
+ MapType<?, ?> mapType = (MapType<?, ?>) elementType;
+ List<Map> mapList = JsonUtils.toList(fieldValue,
Map.class);
+ Object arr = Array.newInstance(elementType.getTypeClass(),
mapList.size());
+ SeaTunnelDataType<?> keyType = mapType.getKeyType();
+ SeaTunnelDataType<?> valueType = mapType.getValueType();
+ for (int i = 0; i < mapList.size(); i++) {
+ Map<String, String> map = mapList.get(i);
+ Map<Object, Object> convertMap = new HashMap<>();
+ for (Map.Entry entry : map.entrySet()) {
+ Object convertKey =
+ convertValue(
+ keyType,
+ Objects.isNull(entry.getKey())
+ ? null
+ :
String.valueOf(entry.getKey()));
+ Object convertValue =
+ convertValue(
+ valueType,
+ Objects.isNull(entry.getValue())
+ ? null
+ :
String.valueOf(entry.getValue()));
+ convertMap.put(convertKey, convertValue);
+ }
+ Array.set(arr, i, convertMap);
+ }
+ return arr;
+ }
+
List<String> stringList = JsonUtils.toList(fieldValue,
String.class);
Object arr = Array.newInstance(elementType.getTypeClass(),
stringList.size());
for (int i = 0; i < stringList.size(); i++) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 87730fee46..fa805d851b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -133,6 +133,7 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
createIndexDocs();
createIndexWithFullType();
createIndexForResourceNull("st_index4");
+ createIndexWithNestType();
}
/** create a index,and bulk some documents */
@@ -156,6 +157,31 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
esRestClient.bulk(requestBody.toString());
}
+ private void createIndexWithNestType() throws IOException,
InterruptedException {
+ String mapping =
+ IOUtils.toString(
+
ContainerUtil.getResourcesFile("/elasticsearch/st_index_nest_mapping.json")
+ .toURI(),
+ StandardCharsets.UTF_8);
+ esRestClient.createIndex("st_index_nest", mapping);
+ esRestClient.createIndex("st_index_nest_copy", mapping);
+ BulkResponse response =
+ esRestClient.bulk(
+ "{ \"index\" : { \"_index\" : \"st_index_nest\",
\"_id\" : \"1\" } }\n"
+ + IOUtils.toString(
+ ContainerUtil.getResourcesFile(
+
"/elasticsearch/st_index_nest_data.json")
+ .toURI(),
+ StandardCharsets.UTF_8)
+ .replace("\n", "")
+ + "\n");
+ Assertions.assertFalse(response.isErrors(), response.getResponse());
+ // waiting index refresh
+ Thread.sleep(INDEX_REFRESH_MILL_DELAY);
+ Assertions.assertEquals(
+ 3,
esRestClient.getIndexDocsCount("st_index_nest").get(0).getDocsCount());
+ }
+
private void createIndexWithFullType() throws IOException,
InterruptedException {
String mapping =
IOUtils.toString(
@@ -202,6 +228,21 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
+ @TestTemplate
+ public void testElasticsearchWithNestSchema(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/elasticsearch/elasticsearch_source_and_sink_with_nest.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ List<String> sinkData =
readSinkDataWithNestSchema("st_index_nest_copy");
+ String data =
+ "{\"address\":[{\"zipcode\":\"10001\",\"city\":\"New
York\",\"street\":\"123 Main St\"},"
+ + "{\"zipcode\":\"90001\",\"city\":\"Los
Angeles\",\"street\":\"456 Elm St\"}],\"name\":\"John Doe\"}";
+
+ Assertions.assertIterableEquals(Lists.newArrayList(data), sinkData);
+ }
+
@TestTemplate
public void testElasticsSearchWithMultiSourceByFilter(TestContainer
container)
throws InterruptedException, IOException {
@@ -546,6 +587,13 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
return getDocsWithTransformTimestamp(source, index);
}
+ private List<String> readSinkDataWithNestSchema(String index) throws
InterruptedException {
+ // wait for index refresh
+ Thread.sleep(INDEX_REFRESH_MILL_DELAY);
+ List<String> source = Lists.newArrayList("name", "address");
+ return getDocsWithNestType(source, index);
+ }
+
private List<String> readMultiSinkData(String index, List<String> source)
throws InterruptedException {
// wait for index refresh
@@ -604,6 +652,25 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
return docs;
}
+ private List<String> getDocsWithNestType(List<String> source, String
index) {
+ Map<String, Object> query = new HashMap<>();
+ query.put("match_all", new HashMap<>());
+ ScrollResult scrollResult = esRestClient.searchByScroll(index, source,
query, "1m", 1000);
+ scrollResult
+ .getDocs()
+ .forEach(
+ x -> {
+ x.remove("_index");
+ x.remove("_type");
+ x.remove("_id");
+ });
+ List<String> docs =
+ scrollResult.getDocs().stream()
+ .map(JsonUtils::toJsonString)
+ .collect(Collectors.toList());
+ return docs;
+ }
+
private List<String> getDocsWithTransformDate(List<String> source, String
index) {
return getDocsWithTransformDate(source, index,
Collections.emptyList());
}
@@ -739,6 +806,13 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
.collect(Collectors.toList());
}
+ private List<String> mapTestDatasetForNest(List<String> testDataset) {
+ return testDataset.stream()
+ .map(JsonUtils::parseObject)
+ .map(JsonNode::toString)
+ .collect(Collectors.toList());
+ }
+
/**
* Use custom filtering criteria to query data
*
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
new file mode 100644
index 0000000000..6b07c9b80f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ #checkpoint.interval = 10000
+}
+
+source {
+Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ index = "st_index_nest"
+ source = ["address","name"]
+ query = {"match_all": {}}
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+ }
+}
+
+transform {
+}
+
+sink {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ index = "st_index_nest_copy"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
new file mode 100644
index 0000000000..b63bdf962f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
@@ -0,0 +1,15 @@
+{
+ "name": "John Doe",
+ "address": [
+ {
+ "street": "123 Main St",
+ "city": "New York",
+ "zipcode": "10001"
+ },
+ {
+ "street": "456 Elm St",
+ "city": "Los Angeles",
+ "zipcode": "90001"
+ }
+ ]
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
new file mode 100644
index 0000000000..1b4d15a102
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
@@ -0,0 +1,23 @@
+{
+ "mappings": {
+ "properties": {
+ "name": {
+ "type": "text"
+ },
+ "address": {
+ "type": "nested",
+ "properties": {
+ "street": {
+ "type": "text"
+ },
+ "city": {
+ "type": "keyword"
+ },
+ "zipcode": {
+ "type": "keyword"
+ }
+ }
+ }
+ }
+ }
+}