This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ddd92383b8 [Improve][Elasticsearch] Add LocalDateTime serialization
test and simplify serializer (#10135)
ddd92383b8 is described below
commit ddd92383b8b7f0cb17d67b96ed28b27f4b434b72
Author: yzeng1618 <[email protected]>
AuthorDate: Sun Dec 7 22:27:55 2025 +0800
[Improve][Elasticsearch] Add LocalDateTime serialization test and simplify
serializer (#10135)
Co-authored-by: zengyi <[email protected]>
---
.../serialize/ElasticsearchRowSerializer.java | 55 +++++++++++-----------
.../serialize/ElasticsearchRowSerializerTest.java | 30 ++++++++++++
2 files changed, 58 insertions(+), 27 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index aa1e46d283..5b97c12c31 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -45,6 +45,7 @@ import java.util.function.Function;
/** use in elasticsearch version >= 2.x and <= 8.x */
public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer {
+
private final SeaTunnelRowType seaTunnelRowType;
private final ObjectMapper objectMapper = new ObjectMapper();
@@ -201,49 +202,49 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
}
private Object convertValue(String fieldName, Object value) {
+ if (value == null) {
+ return null;
+ }
+
if (value instanceof Temporal) {
// jackson not support jdk8 new time api
return value.toString();
- } else if (value instanceof Map) {
+ }
+
+ if (value instanceof Map) {
for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
((Map) value).put(entry.getKey(), convertValue(fieldName,
entry.getValue()));
}
return value;
- } else if (value instanceof List) {
+ }
+
+ if (value instanceof List) {
for (int i = 0; i < ((List) value).size(); i++) {
((List) value).set(i, convertValue(fieldName, ((List)
value).get(i)));
}
return value;
- } else if (value instanceof ByteBuffer) {
- // Check if this field is configured as a vectorization field
- if (vectorizationFields != null &&
vectorizationFields.contains(fieldName)) {
- ByteBuffer buffer = (ByteBuffer) value;
- Float[] floats = VectorUtils.toFloatArray(buffer);
-
- // Use the configured dimension or calculate it from the
buffer size
- int dimension = vectorDimension > 0 ? vectorDimension :
buffer.remaining() / 4;
-
- // Read the floats from the buffer
- for (int i = 0; i < dimension && buffer.remaining() >= 4; i++)
{
- floats[i] = buffer.getFloat();
- }
+ }
- return floats;
- } else {
- // Default behavior for ByteBuffer fields not specified as
vectorization fields
- ByteBuffer buffer = (ByteBuffer) value;
- Float[] floats = VectorUtils.toFloatArray(buffer);
- int floatCount = buffer.remaining() / 4;
+ if (value instanceof ByteBuffer) {
+ ByteBuffer buffer = (ByteBuffer) value;
+ Float[] floats = VectorUtils.toFloatArray(buffer);
- for (int i = 0; i < floatCount; i++) {
- floats[i] = buffer.getFloat();
- }
+ // Use configured dimension for vectorization fields, otherwise
calculate from buffer
+ int dimension =
+ (vectorizationFields != null
+ && vectorizationFields.contains(fieldName)
+ && vectorDimension > 0)
+ ? vectorDimension
+ : buffer.remaining() / 4;
- return floats;
+ for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) {
+ floats[i] = buffer.getFloat();
}
- } else {
- return value;
+
+ return floats;
}
+
+ return value;
}
private Map<String, String> createMetadata(@NonNull SeaTunnelRow row,
@NonNull String key) {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
index 2131bdc942..6f95a3706d 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
@@ -31,6 +31,7 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -184,4 +185,33 @@ public class ElasticsearchRowSerializerTest {
String upsertStr = serializer.serializeRow(row);
Assertions.assertEquals(expected, upsertStr);
}
+
+ @Test
+ public void testSerializeLocalDateTimeFieldFormat() {
+ String index = "st_index";
+ Map<String, Object> confMap = new HashMap<>();
+ confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
+
+ ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
+ ElasticsearchClusterInfo clusterInfo =
+
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
+ IndexInfo indexInfo = new IndexInfo(index, pluginConf);
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(
+ new String[] {"id", "ts"},
+ new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});
+
+ final ElasticsearchRowSerializer serializer =
+ new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);
+
+ String id = "0001";
+ LocalDateTime ts = LocalDateTime.of(2023, 1, 2, 3, 4, 5);
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, ts});
+ row.setRowKind(RowKind.UPDATE_AFTER);
+
+ String result = serializer.serializeRow(row);
+ Assertions.assertTrue(
+ result.contains("\"ts\":\"2023-01-02T03:04:05\""),
+ "LocalDateTime field should be formatted with ISO-8601 'T'
separator");
+ }
}