This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new eaa15e4c8d [Feature][connector-elasticsearch] elasticsearch support 
nested type (#8462)
eaa15e4c8d is described below

commit eaa15e4c8d72c8e002c4e6b663776ede4e2505b8
Author: CosmosNi <[email protected]>
AuthorDate: Tue Jan 7 18:58:32 2025 +0800

    [Feature][connector-elasticsearch] elasticsearch support nested type (#8462)
---
 .../apache/seatunnel/api/table/type/ArrayType.java |  2 +-
 .../seatunnel/api/table/type/SeaTunnelRow.java     | 10 +++
 .../catalog/ElasticSearchTypeConverter.java        |  8 ++-
 .../source/DefaultSeaTunnelRowDeserializer.java    | 30 +++++++++
 .../connector/elasticsearch/ElasticsearchIT.java   | 74 ++++++++++++++++++++++
 .../elasticsearch_source_and_sink_with_nest.conf   | 53 ++++++++++++++++
 .../elasticsearch/st_index_nest_data.json          | 15 +++++
 .../elasticsearch/st_index_nest_mapping.json       | 23 +++++++
 8 files changed, 212 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
index 36c3362108..65f7651e79 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -56,7 +56,7 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T> {
     private final Class<T> arrayClass;
     private final SeaTunnelDataType<E> elementType;
 
-    protected ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) 
{
+    public ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
         this.arrayClass = arrayClass;
         this.elementType = elementType;
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 39f61aee5d..5c253b4cf5 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -305,6 +305,16 @@ public final class SeaTunnelRow implements Serializable {
                 return getBytesForArray(v, BasicType.FLOAT_TYPE);
             case "Double[]":
                 return getBytesForArray(v, BasicType.DOUBLE_TYPE);
+            case "Map[]":
+                int sizeMaps = 0;
+                for (Map o : (Map[]) v) {
+                    for (Map.Entry<?, ?> entry : ((Map<?, ?>) o).entrySet()) {
+                        sizeMaps +=
+                                getBytesForValue(entry.getKey())
+                                        + getBytesForValue(entry.getValue());
+                    }
+                }
+                return sizeMaps;
             case "HashMap":
             case "LinkedHashMap":
                 int size = 0;
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
index 412342cb82..d92d083998 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
@@ -64,7 +64,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsT
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT;
@@ -150,6 +149,12 @@ public class ElasticSearchTypeConverter implements 
BasicTypeConverter<BasicTypeD
                                 });
                 builder.dataType(rowType);
                 break;
+            case EsType.NESTED:
+                builder.dataType(
+                        new ArrayType<>(
+                                Map[].class,
+                                new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE)));
+                break;
             case INTEGER:
             case TOKEN_COUNT:
                 builder.dataType(BasicType.INT_TYPE);
@@ -207,7 +212,6 @@ public class ElasticSearchTypeConverter implements 
BasicTypeConverter<BasicTypeD
             case COMPLETION:
             case STRING:
             case GEO_SHAPE:
-            case NESTED:
             case PERCOLATOR:
             case POINT:
             case RANK_FEATURES:
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index fd176f2f03..f39aeae8fd 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -46,6 +46,7 @@ import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
@@ -177,6 +178,35 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
             } else if (fieldType instanceof ArrayType) {
                 ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
                 SeaTunnelDataType<?> elementType = arrayType.getElementType();
+                if (elementType instanceof MapType) {
+                    MapType<?, ?> mapType = (MapType<?, ?>) elementType;
+                    List<Map> mapList = JsonUtils.toList(fieldValue, 
Map.class);
+                    Object arr = Array.newInstance(elementType.getTypeClass(), 
mapList.size());
+                    SeaTunnelDataType<?> keyType = mapType.getKeyType();
+                    SeaTunnelDataType<?> valueType = mapType.getValueType();
+                    for (int i = 0; i < mapList.size(); i++) {
+                        Map<String, String> map = mapList.get(i);
+                        Map<Object, Object> convertMap = new HashMap<>();
+                        for (Map.Entry entry : map.entrySet()) {
+                            Object convertKey =
+                                    convertValue(
+                                            keyType,
+                                            Objects.isNull(entry.getKey())
+                                                    ? null
+                                                    : 
String.valueOf(entry.getKey()));
+                            Object convertValue =
+                                    convertValue(
+                                            valueType,
+                                            Objects.isNull(entry.getValue())
+                                                    ? null
+                                                    : 
String.valueOf(entry.getValue()));
+                            convertMap.put(convertKey, convertValue);
+                        }
+                        Array.set(arr, i, convertMap);
+                    }
+                    return arr;
+                }
+
                 List<String> stringList = JsonUtils.toList(fieldValue, 
String.class);
                 Object arr = Array.newInstance(elementType.getTypeClass(), 
stringList.size());
                 for (int i = 0; i < stringList.size(); i++) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 87730fee46..fa805d851b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -133,6 +133,7 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         createIndexDocs();
         createIndexWithFullType();
         createIndexForResourceNull("st_index4");
+        createIndexWithNestType();
     }
 
     /** create a index,and bulk some documents */
@@ -156,6 +157,31 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         esRestClient.bulk(requestBody.toString());
     }
 
+    private void createIndexWithNestType() throws IOException, 
InterruptedException {
+        String mapping =
+                IOUtils.toString(
+                        
ContainerUtil.getResourcesFile("/elasticsearch/st_index_nest_mapping.json")
+                                .toURI(),
+                        StandardCharsets.UTF_8);
+        esRestClient.createIndex("st_index_nest", mapping);
+        esRestClient.createIndex("st_index_nest_copy", mapping);
+        BulkResponse response =
+                esRestClient.bulk(
+                        "{ \"index\" : { \"_index\" : \"st_index_nest\", 
\"_id\" : \"1\" } }\n"
+                                + IOUtils.toString(
+                                                ContainerUtil.getResourcesFile(
+                                                                
"/elasticsearch/st_index_nest_data.json")
+                                                        .toURI(),
+                                                StandardCharsets.UTF_8)
+                                        .replace("\n", "")
+                                + "\n");
+        Assertions.assertFalse(response.isErrors(), response.getResponse());
+        // waiting index refresh
+        Thread.sleep(INDEX_REFRESH_MILL_DELAY);
+        Assertions.assertEquals(
+                3, 
esRestClient.getIndexDocsCount("st_index_nest").get(0).getDocsCount());
+    }
+
     private void createIndexWithFullType() throws IOException, 
InterruptedException {
         String mapping =
                 IOUtils.toString(
@@ -202,6 +228,21 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
+    @TestTemplate
+    public void testElasticsearchWithNestSchema(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/elasticsearch/elasticsearch_source_and_sink_with_nest.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        List<String> sinkData = 
readSinkDataWithNestSchema("st_index_nest_copy");
+        String data =
+                "{\"address\":[{\"zipcode\":\"10001\",\"city\":\"New 
York\",\"street\":\"123 Main St\"},"
+                        + "{\"zipcode\":\"90001\",\"city\":\"Los 
Angeles\",\"street\":\"456 Elm St\"}],\"name\":\"John Doe\"}";
+
+        Assertions.assertIterableEquals(Lists.newArrayList(data), sinkData);
+    }
+
     @TestTemplate
     public void testElasticsSearchWithMultiSourceByFilter(TestContainer 
container)
             throws InterruptedException, IOException {
@@ -546,6 +587,13 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         return getDocsWithTransformTimestamp(source, index);
     }
 
+    private List<String> readSinkDataWithNestSchema(String index) throws 
InterruptedException {
+        // wait for index refresh
+        Thread.sleep(INDEX_REFRESH_MILL_DELAY);
+        List<String> source = Lists.newArrayList("name", "address");
+        return getDocsWithNestType(source, index);
+    }
+
     private List<String> readMultiSinkData(String index, List<String> source)
             throws InterruptedException {
         // wait for index refresh
@@ -604,6 +652,25 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         return docs;
     }
 
+    private List<String> getDocsWithNestType(List<String> source, String 
index) {
+        Map<String, Object> query = new HashMap<>();
+        query.put("match_all", new HashMap<>());
+        ScrollResult scrollResult = esRestClient.searchByScroll(index, source, 
query, "1m", 1000);
+        scrollResult
+                .getDocs()
+                .forEach(
+                        x -> {
+                            x.remove("_index");
+                            x.remove("_type");
+                            x.remove("_id");
+                        });
+        List<String> docs =
+                scrollResult.getDocs().stream()
+                        .map(JsonUtils::toJsonString)
+                        .collect(Collectors.toList());
+        return docs;
+    }
+
     private List<String> getDocsWithTransformDate(List<String> source, String 
index) {
         return getDocsWithTransformDate(source, index, 
Collections.emptyList());
     }
@@ -739,6 +806,13 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                 .collect(Collectors.toList());
     }
 
+    private List<String> mapTestDatasetForNest(List<String> testDataset) {
+        return testDataset.stream()
+                .map(JsonUtils::parseObject)
+                .map(JsonNode::toString)
+                .collect(Collectors.toList());
+    }
+
     /**
      * Use custom filtering criteria to query data
      *
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
new file mode 100644
index 0000000000..6b07c9b80f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  #checkpoint.interval = 10000
+}
+
+source {
+Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    index = "st_index_nest"
+    source = ["address","name"]
+    query = {"match_all": {}}
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+ }
+}
+
+transform {
+}
+
+sink {
+    Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    index = "st_index_nest_copy"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
new file mode 100644
index 0000000000..b63bdf962f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
@@ -0,0 +1,15 @@
+{
+  "name": "John Doe",
+  "address": [
+    {
+      "street": "123 Main St",
+      "city": "New York",
+      "zipcode": "10001"
+    },
+    {
+      "street": "456 Elm St",
+      "city": "Los Angeles",
+      "zipcode": "90001"
+    }
+  ]
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
new file mode 100644
index 0000000000..1b4d15a102
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
@@ -0,0 +1,23 @@
+{
+  "mappings": {
+    "properties": {
+      "name": {
+        "type": "text"
+      },
+      "address": {
+        "type": "nested",
+        "properties": {
+          "street": {
+            "type": "text"
+          },
+          "city": {
+            "type": "keyword"
+          },
+          "zipcode": {
+            "type": "keyword"
+          }
+        }
+      }
+    }
+  }
+}

Reply via email to