This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a573b8dbe [Bug][Connector-V2][ES]Fix es source no data (#4076)
a573b8dbe is described below
commit a573b8dbed3a176ae6ed18dc58854351adb08d74
Author: 王一川 <[email protected]>
AuthorDate: Tue Feb 14 23:47:13 2023 +0800
[Bug][Connector-V2][ES]Fix es source no data (#4076)
* [Bug][Connector-v2][es] fix es source null pointer
* [Fix][Connector-V2][ES]repair the index of complex mapping, the data
cannot be obtained through the form of a.b
* [Fix][Connector-V2][ES]compatible with es5 mapping
---
seatunnel-connectors-v2/connector-elasticsearch/pom.xml | 13 +++++++++++++
.../seatunnel/elasticsearch/client/EsRestClient.java | 7 ++++++-
.../source/DefaultSeaTunnelRowDeserializer.java | 17 ++++++++++++++++-
3 files changed, 35 insertions(+), 2 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
index 2b9716fd6..9c93a8a7c 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
+++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml
@@ -29,6 +29,7 @@
<artifactId>connector-elasticsearch</artifactId>
<properties>
<elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
+ <guava.version>31.1-jre</guava.version>
</properties>
<dependencies>
@@ -52,6 +53,18 @@
<groupId>io.airlift</groupId>
<artifactId>security</artifactId>
<version>206</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- I'm not sure which version of guava to use, so I choose the
latest; sure enough, the version inherited from the parent project is not
compatible -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
</dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 0831adf9d..068e38623 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -462,7 +462,12 @@ public class EsRestClient {
} else {
for (Iterator<JsonNode> iter =
mappingsProperty.iterator(); iter.hasNext(); ) {
JsonNode typeNode = iter.next();
- JsonNode properties = typeNode.get("properties");
+ JsonNode properties;
+ if (typeNode.has("properties")) {
+ properties = typeNode.get("properties");
+ } else {
+ properties = typeNode;
+ }
mapping.putAll(getFieldTypeMappingFromProperties(properties, source));
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 35b4d656b..0cd06dcaa 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -107,7 +108,7 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
try {
for (int i = 0; i < rowTypeInfo.getTotalFields(); i++) {
fieldName = rowTypeInfo.getFieldName(i);
- value = rowRecord.getDoc().get(fieldName);
+ value = recursiveGet(rowRecord.getDoc(), fieldName);
if (value != null) {
seaTunnelDataType = rowTypeInfo.getFieldType(i);
seaTunnelFields[i] = convertValue(seaTunnelDataType,
value.toString());
@@ -199,4 +200,18 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
}
return LocalDateTime.parse(formatDate, dateTimeFormatter);
}
+
+ Object recursiveGet(Map<String, Object> collect, String keyWithRecursive) {
+ Object value = null;
+ boolean isFirst = true;
+ for (String key : keyWithRecursive.split("\\.")) {
+ if (isFirst) {
+ value = collect.get(key);
+ isFirst = false;
+ } else if (value instanceof ObjectNode) {
+ value = ((ObjectNode) value).get(key);
+ }
+ }
+ return value;
+ }
}