This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a4ba6a171c [Hotfix][Connector-V2] Fix null not inserted in es (#7493)
a4ba6a171c is described below

commit a4ba6a171cce61794f3b7b64253bc2d7bee16451
Author: corgy-w <[email protected]>
AuthorDate: Mon Aug 26 12:41:39 2024 +0800

    [Hotfix][Connector-V2] Fix null not inserted in es (#7493)
    
    * [Hotfix][Connector-V2] Fix null not inserted in es
    
    * [Hotfix][Connector-V2] update
    
    * [Hotfix][Connector-V2] fix multi sink es test case
---
 .../serialize/ElasticsearchRowSerializer.java         |  1 +
 .../e2e/connector/elasticsearch/ElasticsearchIT.java  | 19 ++++++++++++-------
 .../elasticsearch/elasticsearch_source_and_sink.conf  |  1 +
 .../st_index_source_without_schema_and_sink.json      |  3 +++
 4 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index 2f7eb86b91..18ab0ae812 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -169,6 +169,7 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
         for (int i = 0; i < fieldNames.length; i++) {
             Object value = fields[i];
             if (value == null) {
+                doc.put(fieldNames[i], null);
             } else if (value instanceof SeaTunnelRow) {
                 doc.put(
                         fieldNames[i],
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 375a6cc049..e92c399378 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -115,9 +115,10 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                         Optional.empty(),
                         Optional.empty());
         testDataset = generateTestDataSet();
+        createIndexForResourceNull("st_index");
         createIndexDocs();
         createIndexWithFullType();
-        createIndexForResourceNull();
+        createIndexForResourceNull("st_index4");
     }
 
     /** create a index,and bulk some documents */
@@ -163,14 +164,14 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                 2, 
esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
     }
 
-    private void createIndexForResourceNull() throws IOException {
+    private void createIndexForResourceNull(String indexName) throws 
IOException {
         String mapping =
                 IOUtils.toString(
                         ContainerUtil.getResourcesFile(
                                         
"/elasticsearch/st_index_source_without_schema_and_sink.json")
                                 .toURI(),
                         StandardCharsets.UTF_8);
-        esRestClient.createIndex("st_index4", mapping);
+        esRestClient.createIndex(indexName, mapping);
     }
 
     @TestTemplate
@@ -268,7 +269,8 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                     "c_bytes",
                     "c_int",
                     "c_date",
-                    "c_timestamp"
+                    "c_timestamp",
+                    "c_null"
                 };
 
         List<String> documents = new ArrayList<>();
@@ -283,14 +285,16 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                         Boolean.FALSE,
                         Byte.parseByte("1"),
                         Short.parseShort("1"),
-                        i,
                         Long.parseLong("1"),
                         Float.parseFloat("1.1"),
                         Double.parseDouble("1.1"),
                         BigDecimal.valueOf(11, 1),
                         "test".getBytes(),
+                        i,
                         LocalDate.now().toString(),
-                        System.currentTimeMillis()
+                        System.currentTimeMillis(),
+                        // Null values are also a basic use case for testing
+                        null
                     };
             for (int j = 0; j < fields.length; j++) {
                 doc.put(fields[j], values[j]);
@@ -326,7 +330,8 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                         "c_bytes",
                         "c_int",
                         "c_date",
-                        "c_timestamp");
+                        "c_timestamp",
+                        "c_null");
         return getDocsWithTransformTimestamp(source, index);
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
index c6c668e469..a2dc3c2446 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
@@ -51,6 +51,7 @@ source {
         c_int = int
         c_date = date
         c_timestamp = timestamp
+        c_null = "null"
       }
     }
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
index fc1adb3513..680afe1022 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_source_without_schema_and_sink.json
@@ -58,6 +58,9 @@
                 },
                 "c_tinyint": {
                     "type": "long"
+                },
+                "c_null":{
+                    "type": "long"
                 }
             }
         }

Reply via email to