This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a4ba6a171c [Hotfix][Connector-V2] Fix null not inserted in es (#7493)
a4ba6a171c is described below
commit a4ba6a171cce61794f3b7b64253bc2d7bee16451
Author: corgy-w <[email protected]>
AuthorDate: Mon Aug 26 12:41:39 2024 +0800
[Hotfix][Connector-V2] Fix null not inserted in es (#7493)
* [Hotfix][Connector-V2] Fix null not inserted in es
* [Hotfix][Connector-V2] update
* [Hotfix][Connector-V2] fix multi sink es test case
---
.../serialize/ElasticsearchRowSerializer.java | 1 +
.../e2e/connector/elasticsearch/ElasticsearchIT.java | 19 ++++++++++++-------
.../elasticsearch/elasticsearch_source_and_sink.conf | 1 +
.../st_index_source_without_schema_and_sink.json | 3 +++
4 files changed, 17 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index 2f7eb86b91..18ab0ae812 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -169,6 +169,7 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
for (int i = 0; i < fieldNames.length; i++) {
Object value = fields[i];
if (value == null) {
+ doc.put(fieldNames[i], null);
} else if (value instanceof SeaTunnelRow) {
doc.put(
fieldNames[i],
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 375a6cc049..e92c399378 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -115,9 +115,10 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Optional.empty(),
Optional.empty());
testDataset = generateTestDataSet();
+ createIndexForResourceNull("st_index");
createIndexDocs();
createIndexWithFullType();
- createIndexForResourceNull();
+ createIndexForResourceNull("st_index4");
}
/** create a index,and bulk some documents */
@@ -163,14 +164,14 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
2,
esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
}
- private void createIndexForResourceNull() throws IOException {
+ private void createIndexForResourceNull(String indexName) throws
IOException {
String mapping =
IOUtils.toString(
ContainerUtil.getResourcesFile(
"/elasticsearch/st_index_source_without_schema_and_sink.json")
.toURI(),
StandardCharsets.UTF_8);
- esRestClient.createIndex("st_index4", mapping);
+ esRestClient.createIndex(indexName, mapping);
}
@TestTemplate
@@ -268,7 +269,8 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
"c_bytes",
"c_int",
"c_date",
- "c_timestamp"
+ "c_timestamp",
+ "c_null"
};
List<String> documents = new ArrayList<>();
@@ -283,14 +285,16 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Boolean.FALSE,
Byte.parseByte("1"),
Short.parseShort("1"),
- i,
Long.parseLong("1"),
Float.parseFloat("1.1"),
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
"test".getBytes(),
+ i,
LocalDate.now().toString(),
- System.currentTimeMillis()
+ System.currentTimeMillis(),
+ // Null values are also a basic use case for testing
+ null
};
for (int j = 0; j < fields.length; j++) {
doc.put(fields[j], values[j]);
@@ -326,7 +330,8 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
"c_bytes",
"c_int",
"c_date",
- "c_timestamp");
+ "c_timestamp",
+ "c_null");
return getDocsWithTransformTimestamp(source, index);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
index c6c668e469..a2dc3c2446 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
@@ -51,6 +51,7 @@ source {
c_int = int
c_date = date
c_timestamp = timestamp
+ c_null = "null"
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
index fc1adb3513..680afe1022 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
@@ -58,6 +58,9 @@
},
"c_tinyint": {
"type": "long"
+ },
+ "c_null":{
+ "type": "long"
}
}
}