This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c68944893a Revert "[Feature][connector-elasticsearch] elasticsearch 
support nested type (#8462)" (#8485)
c68944893a is described below

commit c68944893a7f03140b0d27e90312f52bc87983c7
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 9 15:23:39 2025 +0800

    Revert "[Feature][connector-elasticsearch] elasticsearch support nested 
type (#8462)" (#8485)
---
 .../apache/seatunnel/api/table/type/ArrayType.java |  2 +-
 .../seatunnel/api/table/type/SeaTunnelRow.java     | 10 ---
 .../catalog/ElasticSearchTypeConverter.java        |  8 +--
 .../source/DefaultSeaTunnelRowDeserializer.java    | 30 ---------
 .../connector/elasticsearch/ElasticsearchIT.java   | 74 ----------------------
 .../elasticsearch_source_and_sink_with_nest.conf   | 53 ----------------
 .../elasticsearch/st_index_nest_data.json          | 15 -----
 .../elasticsearch/st_index_nest_mapping.json       | 23 -------
 8 files changed, 3 insertions(+), 212 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
index 65f7651e79..36c3362108 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -56,7 +56,7 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T> {
     private final Class<T> arrayClass;
     private final SeaTunnelDataType<E> elementType;
 
-    public ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
+    protected ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) 
{
         this.arrayClass = arrayClass;
         this.elementType = elementType;
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 5c253b4cf5..39f61aee5d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -305,16 +305,6 @@ public final class SeaTunnelRow implements Serializable {
                 return getBytesForArray(v, BasicType.FLOAT_TYPE);
             case "Double[]":
                 return getBytesForArray(v, BasicType.DOUBLE_TYPE);
-            case "Map[]":
-                int sizeMaps = 0;
-                for (Map o : (Map[]) v) {
-                    for (Map.Entry<?, ?> entry : ((Map<?, ?>) o).entrySet()) {
-                        sizeMaps +=
-                                getBytesForValue(entry.getKey())
-                                        + getBytesForValue(entry.getValue());
-                    }
-                }
-                return sizeMaps;
             case "HashMap":
             case "LinkedHashMap":
                 int size = 0;
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
index d92d083998..412342cb82 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
@@ -64,6 +64,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsT
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT;
@@ -149,12 +150,6 @@ public class ElasticSearchTypeConverter implements 
BasicTypeConverter<BasicTypeD
                                 });
                 builder.dataType(rowType);
                 break;
-            case EsType.NESTED:
-                builder.dataType(
-                        new ArrayType<>(
-                                Map[].class,
-                                new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE)));
-                break;
             case INTEGER:
             case TOKEN_COUNT:
                 builder.dataType(BasicType.INT_TYPE);
@@ -212,6 +207,7 @@ public class ElasticSearchTypeConverter implements 
BasicTypeConverter<BasicTypeD
             case COMPLETION:
             case STRING:
             case GEO_SHAPE:
+            case NESTED:
             case PERCOLATOR:
             case POINT:
             case RANK_FEATURES:
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index f39aeae8fd..fd176f2f03 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -46,7 +46,6 @@ import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
@@ -178,35 +177,6 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
             } else if (fieldType instanceof ArrayType) {
                 ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
                 SeaTunnelDataType<?> elementType = arrayType.getElementType();
-                if (elementType instanceof MapType) {
-                    MapType<?, ?> mapType = (MapType<?, ?>) elementType;
-                    List<Map> mapList = JsonUtils.toList(fieldValue, 
Map.class);
-                    Object arr = Array.newInstance(elementType.getTypeClass(), 
mapList.size());
-                    SeaTunnelDataType<?> keyType = mapType.getKeyType();
-                    SeaTunnelDataType<?> valueType = mapType.getValueType();
-                    for (int i = 0; i < mapList.size(); i++) {
-                        Map<String, String> map = mapList.get(i);
-                        Map<Object, Object> convertMap = new HashMap<>();
-                        for (Map.Entry entry : map.entrySet()) {
-                            Object convertKey =
-                                    convertValue(
-                                            keyType,
-                                            Objects.isNull(entry.getKey())
-                                                    ? null
-                                                    : 
String.valueOf(entry.getKey()));
-                            Object convertValue =
-                                    convertValue(
-                                            valueType,
-                                            Objects.isNull(entry.getValue())
-                                                    ? null
-                                                    : 
String.valueOf(entry.getValue()));
-                            convertMap.put(convertKey, convertValue);
-                        }
-                        Array.set(arr, i, convertMap);
-                    }
-                    return arr;
-                }
-
                 List<String> stringList = JsonUtils.toList(fieldValue, 
String.class);
                 Object arr = Array.newInstance(elementType.getTypeClass(), 
stringList.size());
                 for (int i = 0; i < stringList.size(); i++) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index fa805d851b..87730fee46 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -133,7 +133,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         createIndexDocs();
         createIndexWithFullType();
         createIndexForResourceNull("st_index4");
-        createIndexWithNestType();
     }
 
     /** create a index,and bulk some documents */
@@ -157,31 +156,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         esRestClient.bulk(requestBody.toString());
     }
 
-    private void createIndexWithNestType() throws IOException, 
InterruptedException {
-        String mapping =
-                IOUtils.toString(
-                        
ContainerUtil.getResourcesFile("/elasticsearch/st_index_nest_mapping.json")
-                                .toURI(),
-                        StandardCharsets.UTF_8);
-        esRestClient.createIndex("st_index_nest", mapping);
-        esRestClient.createIndex("st_index_nest_copy", mapping);
-        BulkResponse response =
-                esRestClient.bulk(
-                        "{ \"index\" : { \"_index\" : \"st_index_nest\", 
\"_id\" : \"1\" } }\n"
-                                + IOUtils.toString(
-                                                ContainerUtil.getResourcesFile(
-                                                                
"/elasticsearch/st_index_nest_data.json")
-                                                        .toURI(),
-                                                StandardCharsets.UTF_8)
-                                        .replace("\n", "")
-                                + "\n");
-        Assertions.assertFalse(response.isErrors(), response.getResponse());
-        // waiting index refresh
-        Thread.sleep(INDEX_REFRESH_MILL_DELAY);
-        Assertions.assertEquals(
-                3, 
esRestClient.getIndexDocsCount("st_index_nest").get(0).getDocsCount());
-    }
-
     private void createIndexWithFullType() throws IOException, 
InterruptedException {
         String mapping =
                 IOUtils.toString(
@@ -228,21 +202,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
-    @TestTemplate
-    public void testElasticsearchWithNestSchema(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                
container.executeJob("/elasticsearch/elasticsearch_source_and_sink_with_nest.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-
-        List<String> sinkData = 
readSinkDataWithNestSchema("st_index_nest_copy");
-        String data =
-                "{\"address\":[{\"zipcode\":\"10001\",\"city\":\"New 
York\",\"street\":\"123 Main St\"},"
-                        + "{\"zipcode\":\"90001\",\"city\":\"Los 
Angeles\",\"street\":\"456 Elm St\"}],\"name\":\"John Doe\"}";
-
-        Assertions.assertIterableEquals(Lists.newArrayList(data), sinkData);
-    }
-
     @TestTemplate
     public void testElasticsSearchWithMultiSourceByFilter(TestContainer 
container)
             throws InterruptedException, IOException {
@@ -587,13 +546,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         return getDocsWithTransformTimestamp(source, index);
     }
 
-    private List<String> readSinkDataWithNestSchema(String index) throws 
InterruptedException {
-        // wait for index refresh
-        Thread.sleep(INDEX_REFRESH_MILL_DELAY);
-        List<String> source = Lists.newArrayList("name", "address");
-        return getDocsWithNestType(source, index);
-    }
-
     private List<String> readMultiSinkData(String index, List<String> source)
             throws InterruptedException {
         // wait for index refresh
@@ -652,25 +604,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         return docs;
     }
 
-    private List<String> getDocsWithNestType(List<String> source, String 
index) {
-        Map<String, Object> query = new HashMap<>();
-        query.put("match_all", new HashMap<>());
-        ScrollResult scrollResult = esRestClient.searchByScroll(index, source, 
query, "1m", 1000);
-        scrollResult
-                .getDocs()
-                .forEach(
-                        x -> {
-                            x.remove("_index");
-                            x.remove("_type");
-                            x.remove("_id");
-                        });
-        List<String> docs =
-                scrollResult.getDocs().stream()
-                        .map(JsonUtils::toJsonString)
-                        .collect(Collectors.toList());
-        return docs;
-    }
-
     private List<String> getDocsWithTransformDate(List<String> source, String 
index) {
         return getDocsWithTransformDate(source, index, 
Collections.emptyList());
     }
@@ -806,13 +739,6 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                 .collect(Collectors.toList());
     }
 
-    private List<String> mapTestDatasetForNest(List<String> testDataset) {
-        return testDataset.stream()
-                .map(JsonUtils::parseObject)
-                .map(JsonNode::toString)
-                .collect(Collectors.toList());
-    }
-
     /**
      * Use custom filtering criteria to query data
      *
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
deleted file mode 100644
index 6b07c9b80f..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
+++ /dev/null
@@ -1,53 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  #checkpoint.interval = 10000
-}
-
-source {
-Elasticsearch {
-    hosts = ["https://elasticsearch:9200";]
-    username = "elastic"
-    password = "elasticsearch"
-    index = "st_index_nest"
-    source = ["address","name"]
-    query = {"match_all": {}}
-    tls_verify_certificate = false
-    tls_verify_hostname = false
- }
-}
-
-transform {
-}
-
-sink {
-    Elasticsearch {
-    hosts = ["https://elasticsearch:9200";]
-    username = "elastic"
-    password = "elasticsearch"
-    index = "st_index_nest_copy"
-    tls_verify_certificate = false
-    tls_verify_hostname = false
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
deleted file mode 100644
index b63bdf962f..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
-  "name": "John Doe",
-  "address": [
-    {
-      "street": "123 Main St",
-      "city": "New York",
-      "zipcode": "10001"
-    },
-    {
-      "street": "456 Elm St",
-      "city": "Los Angeles",
-      "zipcode": "90001"
-    }
-  ]
-}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
deleted file mode 100644
index 1b4d15a102..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
-  "mappings": {
-    "properties": {
-      "name": {
-        "type": "text"
-      },
-      "address": {
-        "type": "nested",
-        "properties": {
-          "street": {
-            "type": "text"
-          },
-          "city": {
-            "type": "keyword"
-          },
-          "zipcode": {
-            "type": "keyword"
-          }
-        }
-      }
-    }
-  }
-}

Reply via email to