This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c68944893a Revert "[Feature][connector-elasticsearch] elasticsearch
support nested type (#8462)" (#8485)
c68944893a is described below
commit c68944893a7f03140b0d27e90312f52bc87983c7
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 9 15:23:39 2025 +0800
Revert "[Feature][connector-elasticsearch] elasticsearch support nested
type (#8462)" (#8485)
---
.../apache/seatunnel/api/table/type/ArrayType.java | 2 +-
.../seatunnel/api/table/type/SeaTunnelRow.java | 10 ---
.../catalog/ElasticSearchTypeConverter.java | 8 +--
.../source/DefaultSeaTunnelRowDeserializer.java | 30 ---------
.../connector/elasticsearch/ElasticsearchIT.java | 74 ----------------------
.../elasticsearch_source_and_sink_with_nest.conf | 53 ----------------
.../elasticsearch/st_index_nest_data.json | 15 -----
.../elasticsearch/st_index_nest_mapping.json | 23 -------
8 files changed, 3 insertions(+), 212 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
index 65f7651e79..36c3362108 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -56,7 +56,7 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T> {
private final Class<T> arrayClass;
private final SeaTunnelDataType<E> elementType;
- public ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
+ protected ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType)
{
this.arrayClass = arrayClass;
this.elementType = elementType;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 5c253b4cf5..39f61aee5d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -305,16 +305,6 @@ public final class SeaTunnelRow implements Serializable {
return getBytesForArray(v, BasicType.FLOAT_TYPE);
case "Double[]":
return getBytesForArray(v, BasicType.DOUBLE_TYPE);
- case "Map[]":
- int sizeMaps = 0;
- for (Map o : (Map[]) v) {
- for (Map.Entry<?, ?> entry : ((Map<?, ?>) o).entrySet()) {
- sizeMaps +=
- getBytesForValue(entry.getKey())
- + getBytesForValue(entry.getValue());
- }
- }
- return sizeMaps;
case "HashMap":
case "LinkedHashMap":
int size = 0;
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
index d92d083998..412342cb82 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java
@@ -64,6 +64,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsT
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT;
@@ -149,12 +150,6 @@ public class ElasticSearchTypeConverter implements
BasicTypeConverter<BasicTypeD
});
builder.dataType(rowType);
break;
- case EsType.NESTED:
- builder.dataType(
- new ArrayType<>(
- Map[].class,
- new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE)));
- break;
case INTEGER:
case TOKEN_COUNT:
builder.dataType(BasicType.INT_TYPE);
@@ -212,6 +207,7 @@ public class ElasticSearchTypeConverter implements
BasicTypeConverter<BasicTypeD
case COMPLETION:
case STRING:
case GEO_SHAPE:
+ case NESTED:
case PERCOLATOR:
case POINT:
case RANK_FEATURES:
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index f39aeae8fd..fd176f2f03 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -46,7 +46,6 @@ import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
@@ -178,35 +177,6 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
} else if (fieldType instanceof ArrayType) {
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
SeaTunnelDataType<?> elementType = arrayType.getElementType();
- if (elementType instanceof MapType) {
- MapType<?, ?> mapType = (MapType<?, ?>) elementType;
- List<Map> mapList = JsonUtils.toList(fieldValue,
Map.class);
- Object arr = Array.newInstance(elementType.getTypeClass(),
mapList.size());
- SeaTunnelDataType<?> keyType = mapType.getKeyType();
- SeaTunnelDataType<?> valueType = mapType.getValueType();
- for (int i = 0; i < mapList.size(); i++) {
- Map<String, String> map = mapList.get(i);
- Map<Object, Object> convertMap = new HashMap<>();
- for (Map.Entry entry : map.entrySet()) {
- Object convertKey =
- convertValue(
- keyType,
- Objects.isNull(entry.getKey())
- ? null
- :
String.valueOf(entry.getKey()));
- Object convertValue =
- convertValue(
- valueType,
- Objects.isNull(entry.getValue())
- ? null
- :
String.valueOf(entry.getValue()));
- convertMap.put(convertKey, convertValue);
- }
- Array.set(arr, i, convertMap);
- }
- return arr;
- }
-
List<String> stringList = JsonUtils.toList(fieldValue,
String.class);
Object arr = Array.newInstance(elementType.getTypeClass(),
stringList.size());
for (int i = 0; i < stringList.size(); i++) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index fa805d851b..87730fee46 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -133,7 +133,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
createIndexDocs();
createIndexWithFullType();
createIndexForResourceNull("st_index4");
- createIndexWithNestType();
}
/** create a index,and bulk some documents */
@@ -157,31 +156,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
esRestClient.bulk(requestBody.toString());
}
- private void createIndexWithNestType() throws IOException,
InterruptedException {
- String mapping =
- IOUtils.toString(
-
ContainerUtil.getResourcesFile("/elasticsearch/st_index_nest_mapping.json")
- .toURI(),
- StandardCharsets.UTF_8);
- esRestClient.createIndex("st_index_nest", mapping);
- esRestClient.createIndex("st_index_nest_copy", mapping);
- BulkResponse response =
- esRestClient.bulk(
- "{ \"index\" : { \"_index\" : \"st_index_nest\",
\"_id\" : \"1\" } }\n"
- + IOUtils.toString(
- ContainerUtil.getResourcesFile(
-
"/elasticsearch/st_index_nest_data.json")
- .toURI(),
- StandardCharsets.UTF_8)
- .replace("\n", "")
- + "\n");
- Assertions.assertFalse(response.isErrors(), response.getResponse());
- // waiting index refresh
- Thread.sleep(INDEX_REFRESH_MILL_DELAY);
- Assertions.assertEquals(
- 3,
esRestClient.getIndexDocsCount("st_index_nest").get(0).getDocsCount());
- }
-
private void createIndexWithFullType() throws IOException,
InterruptedException {
String mapping =
IOUtils.toString(
@@ -228,21 +202,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
- @TestTemplate
- public void testElasticsearchWithNestSchema(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
-
container.executeJob("/elasticsearch/elasticsearch_source_and_sink_with_nest.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
-
- List<String> sinkData =
readSinkDataWithNestSchema("st_index_nest_copy");
- String data =
- "{\"address\":[{\"zipcode\":\"10001\",\"city\":\"New
York\",\"street\":\"123 Main St\"},"
- + "{\"zipcode\":\"90001\",\"city\":\"Los
Angeles\",\"street\":\"456 Elm St\"}],\"name\":\"John Doe\"}";
-
- Assertions.assertIterableEquals(Lists.newArrayList(data), sinkData);
- }
-
@TestTemplate
public void testElasticsSearchWithMultiSourceByFilter(TestContainer
container)
throws InterruptedException, IOException {
@@ -587,13 +546,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
return getDocsWithTransformTimestamp(source, index);
}
- private List<String> readSinkDataWithNestSchema(String index) throws
InterruptedException {
- // wait for index refresh
- Thread.sleep(INDEX_REFRESH_MILL_DELAY);
- List<String> source = Lists.newArrayList("name", "address");
- return getDocsWithNestType(source, index);
- }
-
private List<String> readMultiSinkData(String index, List<String> source)
throws InterruptedException {
// wait for index refresh
@@ -652,25 +604,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
return docs;
}
- private List<String> getDocsWithNestType(List<String> source, String
index) {
- Map<String, Object> query = new HashMap<>();
- query.put("match_all", new HashMap<>());
- ScrollResult scrollResult = esRestClient.searchByScroll(index, source,
query, "1m", 1000);
- scrollResult
- .getDocs()
- .forEach(
- x -> {
- x.remove("_index");
- x.remove("_type");
- x.remove("_id");
- });
- List<String> docs =
- scrollResult.getDocs().stream()
- .map(JsonUtils::toJsonString)
- .collect(Collectors.toList());
- return docs;
- }
-
private List<String> getDocsWithTransformDate(List<String> source, String
index) {
return getDocsWithTransformDate(source, index,
Collections.emptyList());
}
@@ -806,13 +739,6 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
.collect(Collectors.toList());
}
- private List<String> mapTestDatasetForNest(List<String> testDataset) {
- return testDataset.stream()
- .map(JsonUtils::parseObject)
- .map(JsonNode::toString)
- .collect(Collectors.toList());
- }
-
/**
* Use custom filtering criteria to query data
*
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
deleted file mode 100644
index 6b07c9b80f..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf
+++ /dev/null
@@ -1,53 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- #checkpoint.interval = 10000
-}
-
-source {
-Elasticsearch {
- hosts = ["https://elasticsearch:9200"]
- username = "elastic"
- password = "elasticsearch"
- index = "st_index_nest"
- source = ["address","name"]
- query = {"match_all": {}}
- tls_verify_certificate = false
- tls_verify_hostname = false
- }
-}
-
-transform {
-}
-
-sink {
- Elasticsearch {
- hosts = ["https://elasticsearch:9200"]
- username = "elastic"
- password = "elasticsearch"
- index = "st_index_nest_copy"
- tls_verify_certificate = false
- tls_verify_hostname = false
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
deleted file mode 100644
index b63bdf962f..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
- "name": "John Doe",
- "address": [
- {
- "street": "123 Main St",
- "city": "New York",
- "zipcode": "10001"
- },
- {
- "street": "456 Elm St",
- "city": "Los Angeles",
- "zipcode": "90001"
- }
- ]
-}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
deleted file mode 100644
index 1b4d15a102..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
- "mappings": {
- "properties": {
- "name": {
- "type": "text"
- },
- "address": {
- "type": "nested",
- "properties": {
- "street": {
- "type": "text"
- },
- "city": {
- "type": "keyword"
- },
- "zipcode": {
- "type": "keyword"
- }
- }
- }
- }
- }
-}