This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ddd92383b8 [Improve][Elasticsearch] Add LocalDateTime serialization 
test and simplify serializer (#10135)
ddd92383b8 is described below

commit ddd92383b8b7f0cb17d67b96ed28b27f4b434b72
Author: yzeng1618 <[email protected]>
AuthorDate: Sun Dec 7 22:27:55 2025 +0800

    [Improve][Elasticsearch] Add LocalDateTime serialization test and simplify 
serializer (#10135)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../serialize/ElasticsearchRowSerializer.java      | 55 +++++++++++-----------
 .../serialize/ElasticsearchRowSerializerTest.java  | 30 ++++++++++++
 2 files changed, 58 insertions(+), 27 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index aa1e46d283..5b97c12c31 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -45,6 +45,7 @@ import java.util.function.Function;
 
 /** use in elasticsearch version >= 2.x and <= 8.x */
 public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer {
+
     private final SeaTunnelRowType seaTunnelRowType;
     private final ObjectMapper objectMapper = new ObjectMapper();
 
@@ -201,49 +202,49 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
     }
 
     private Object convertValue(String fieldName, Object value) {
+        if (value == null) {
+            return null;
+        }
+
         if (value instanceof Temporal) {
             // jackson not support jdk8 new time api
             return value.toString();
-        } else if (value instanceof Map) {
+        }
+
+        if (value instanceof Map) {
             for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
                 ((Map) value).put(entry.getKey(), convertValue(fieldName, 
entry.getValue()));
             }
             return value;
-        } else if (value instanceof List) {
+        }
+
+        if (value instanceof List) {
             for (int i = 0; i < ((List) value).size(); i++) {
                 ((List) value).set(i, convertValue(fieldName, ((List) 
value).get(i)));
             }
             return value;
-        } else if (value instanceof ByteBuffer) {
-            // Check if this field is configured as a vectorization field
-            if (vectorizationFields != null && 
vectorizationFields.contains(fieldName)) {
-                ByteBuffer buffer = (ByteBuffer) value;
-                Float[] floats = VectorUtils.toFloatArray(buffer);
-
-                // Use the configured dimension or calculate it from the 
buffer size
-                int dimension = vectorDimension > 0 ? vectorDimension : 
buffer.remaining() / 4;
-
-                // Read the floats from the buffer
-                for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) 
{
-                    floats[i] = buffer.getFloat();
-                }
+        }
 
-                return floats;
-            } else {
-                // Default behavior for ByteBuffer fields not specified as 
vectorization fields
-                ByteBuffer buffer = (ByteBuffer) value;
-                Float[] floats = VectorUtils.toFloatArray(buffer);
-                int floatCount = buffer.remaining() / 4;
+        if (value instanceof ByteBuffer) {
+            ByteBuffer buffer = (ByteBuffer) value;
+            Float[] floats = VectorUtils.toFloatArray(buffer);
 
-                for (int i = 0; i < floatCount; i++) {
-                    floats[i] = buffer.getFloat();
-                }
+            // Use configured dimension for vectorization fields, otherwise 
calculate from buffer
+            int dimension =
+                    (vectorizationFields != null
+                                    && vectorizationFields.contains(fieldName)
+                                    && vectorDimension > 0)
+                            ? vectorDimension
+                            : buffer.remaining() / 4;
 
-                return floats;
+            for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) {
+                floats[i] = buffer.getFloat();
             }
-        } else {
-            return value;
+
+            return floats;
         }
+
+        return value;
     }
 
     private Map<String, String> createMetadata(@NonNull SeaTunnelRow row, 
@NonNull String key) {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
index 2131bdc942..6f95a3706d 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
@@ -31,6 +31,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -184,4 +185,33 @@ public class ElasticsearchRowSerializerTest {
         String upsertStr = serializer.serializeRow(row);
         Assertions.assertEquals(expected, upsertStr);
     }
+
+    @Test
+    public void testSerializeLocalDateTimeFieldFormat() {
+        String index = "st_index";
+        Map<String, Object> confMap = new HashMap<>();
+        confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
+
+        ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
+        ElasticsearchClusterInfo clusterInfo =
+                
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
+        IndexInfo indexInfo = new IndexInfo(index, pluginConf);
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(
+                        new String[] {"id", "ts"},
+                        new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});
+
+        final ElasticsearchRowSerializer serializer =
+                new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);
+
+        String id = "0001";
+        LocalDateTime ts = LocalDateTime.of(2023, 1, 2, 3, 4, 5);
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, ts});
+        row.setRowKind(RowKind.UPDATE_AFTER);
+
+        String result = serializer.serializeRow(row);
+        Assertions.assertTrue(
+                result.contains("\"ts\":\"2023-01-02T03:04:05\""),
+                "LocalDateTime field should be formatted with ISO-8601 'T' 
separator");
+    }
 }

Reply via email to