This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a573b8dbe [Bug][Connector-V2][ES]Fix es source no data (#4076)
a573b8dbe is described below

commit a573b8dbed3a176ae6ed18dc58854351adb08d74
Author: 王一川 <[email protected]>
AuthorDate: Tue Feb 14 23:47:13 2023 +0800

    [Bug][Connector-V2][ES]Fix es source no data (#4076)
    
    * [Bug][Connector-v2][es] fix es source null pointer
    
    * [Fix][Connector-V2][ES]repair the index of complex mapping, the data 
cannot be obtained through the form of a.b
    
    * [Fix][Connector-V2][ES]compatible with es5 mapping
---
 seatunnel-connectors-v2/connector-elasticsearch/pom.xml | 13 +++++++++++++
 .../seatunnel/elasticsearch/client/EsRestClient.java    |  7 ++++++-
 .../source/DefaultSeaTunnelRowDeserializer.java         | 17 ++++++++++++++++-
 3 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml 
b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
index 2b9716fd6..9c93a8a7c 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
+++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
@@ -29,6 +29,7 @@
     <artifactId>connector-elasticsearch</artifactId>
     <properties>
         
<elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
+        <guava.version>31.1-jre</guava.version>
     </properties>
 
     <dependencies>
@@ -52,6 +53,18 @@
             <groupId>io.airlift</groupId>
             <artifactId>security</artifactId>
             <version>206</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- I'm not sure which version of guava to use, so I choose the 
latest; sure enough, the version inherited from the parent project is not 
compatible -->
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
         </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 0831adf9d..068e38623 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -462,7 +462,12 @@ public class EsRestClient {
                 } else {
                     for (Iterator<JsonNode> iter = 
mappingsProperty.iterator(); iter.hasNext(); ) {
                         JsonNode typeNode = iter.next();
-                        JsonNode properties = typeNode.get("properties");
+                        JsonNode properties;
+                        if (typeNode.has("properties")) {
+                            properties = typeNode.get("properties");
+                        } else {
+                            properties = typeNode;
+                        }
                         
mapping.putAll(getFieldTypeMappingFromProperties(properties, source));
                     }
                 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 35b4d656b..0cd06dcaa 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -107,7 +108,7 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
         try {
             for (int i = 0; i < rowTypeInfo.getTotalFields(); i++) {
                 fieldName = rowTypeInfo.getFieldName(i);
-                value = rowRecord.getDoc().get(fieldName);
+                value = recursiveGet(rowRecord.getDoc(), fieldName);
                 if (value != null) {
                     seaTunnelDataType = rowTypeInfo.getFieldType(i);
                     seaTunnelFields[i] = convertValue(seaTunnelDataType, 
value.toString());
@@ -199,4 +200,18 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
         }
         return LocalDateTime.parse(formatDate, dateTimeFormatter);
     }
+
+    Object recursiveGet(Map<String, Object> collect, String keyWithRecursive) {
+        Object value = null;
+        boolean isFirst = true;
+        for (String key : keyWithRecursive.split("\\.")) {
+            if (isFirst) {
+                value = collect.get(key);
+                isFirst = false;
+            } else if (value instanceof ObjectNode) {
+                value = ((ObjectNode) value).get(key);
+            }
+        }
+        return value;
+    }
 }

Reply via email to