This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a1ce97155f [Feature][Connectors-V2][Elasticsearch] Support vector 
transformation sink (#9330)
a1ce97155f is described below

commit a1ce97155f1e69bbb301e7c6fe1b069e89056d3e
Author: SEZ <[email protected]>
AuthorDate: Mon May 26 10:56:22 2025 +0800

    [Feature][Connectors-V2][Elasticsearch] Support vector transformation sink 
(#9330)
---
 docs/en/connector-v2/sink/Elasticsearch.md         | 25 ++++++-
 docs/zh/connector-v2/sink/Elasticsearch.md         | 23 ++++++-
 .../config/ElasticsearchSinkOptions.java           | 14 ++++
 .../serialize/ElasticsearchRowSerializer.java      | 53 ++++++++++++--
 .../sink/ElasticsearchSinkWriter.java              | 26 ++++++-
 .../connector/elasticsearch/ElasticsearchIT.java   | 45 ++++++++++++
 .../fake-to-elasticsearch-vector.conf              | 80 ++++++++++++++++++++++
 7 files changed, 257 insertions(+), 9 deletions(-)

diff --git a/docs/en/connector-v2/sink/Elasticsearch.md 
b/docs/en/connector-v2/sink/Elasticsearch.md
index 362866db67..8188674fbe 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -21,7 +21,7 @@ Engine Supported
 
 ## Options
 
-|          name           |  type   | required |        default value         |
+| name                    | type    | required |        default value         |
 |-------------------------|---------|----------|------------------------------|
 | hosts                   | array   | yes      | -                            |
 | index                   | string  | yes      | -                            |
@@ -41,7 +41,8 @@ Engine Supported
 | tls_truststore_path     | string  | no       | -                            |
 | tls_truststore_password | string  | no       | -                            |
 | common-options          |         | no       | -                            |
-
+| vectorization_fields    | array   | no       | -                            |
+| vector_dimensions       | int     | no       | -                            |
 ### hosts [array]
 
 `Elasticsearch` cluster http address, the format is `host:port` , allowing 
multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.
@@ -75,6 +76,12 @@ x-pack password
 
 one bulk request max try size
 
+### vectorization_fields [array]
+fields to embeddings 
+
+### vector_dimensions [int]
+embeddings dimensions
+
 ### max_batch_size [int]
 
 batch bulk doc max size
@@ -150,6 +157,20 @@ sink {
 }
 ```
 
+vector-field writing
+
+```conf
+sink {
+    Elasticsearch {
+        hosts = ["localhost:9200"]
+        index = "${table_name}"
+        schema_save_mode="IGNORE"
+        vectorization_fields = ["review_embedding"]  
+        vector_dimensions = 1024 
+    }
+}
+```
+
 CDC(Change data capture) event
 
 ```conf
diff --git a/docs/zh/connector-v2/sink/Elasticsearch.md 
b/docs/zh/connector-v2/sink/Elasticsearch.md
index 55479940c4..fd463e5a3b 100644
--- a/docs/zh/connector-v2/sink/Elasticsearch.md
+++ b/docs/zh/connector-v2/sink/Elasticsearch.md
@@ -21,7 +21,7 @@ import ChangeLog from 
'../changelog/connector-elasticsearch.md';
 
 ## 选项
 
-|           名称            |   类型    | 是否必须 |             默认值              |
+|           名称            | 类型      | 是否必须 |             默认值              |
 |-------------------------|---------|------|------------------------------|
 | hosts                   | array   | 是    | -                            |
 | index                   | string  | 是    | -                            |
@@ -41,6 +41,8 @@ import ChangeLog from 
'../changelog/connector-elasticsearch.md';
 | tls_truststore_path     | string  | 否    | -                            |
 | tls_truststore_password | string  | 否    | -                            |
 | common-options          |         | 否    | -                            |
+| vectorization_fields    | array   | 否    | -                            |
+| vector_dimensions       | int     | 否    | -                            |
 
 ### hosts [array]
 
@@ -74,6 +76,12 @@ x-pack 密码
 
 批次批量请求最大尝试大小
 
+### vectorization_fields [array]
+需要向量转换的字段名
+
+### vector_dimensions [int]
+向量维度
+
 ### max_batch_size [int]
 
 批次批量文档最大大小
@@ -148,6 +156,19 @@ sink {
     }
 }
 ```
+向量转换
+
+```conf
+sink {
+    Elasticsearch {
+        hosts = ["localhost:9200"]
+        index = "${table_name}"
+        schema_save_mode="IGNORE"
+        vectorization_fields = ["review_embedding"]  
+        vector_dimensions = 1024 
+    }
+}
+```
 
 变更数据捕获 (Change data capture) 事件
 
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
index 22ab5e0d51..0e8b0d92bf 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
@@ -76,4 +76,18 @@ public class ElasticsearchSinkOptions extends 
ElasticsearchBaseOptions {
                             Arrays.asList(DROP_DATA, APPEND_DATA, 
ERROR_WHEN_DATA_EXISTS))
                     .defaultValue(APPEND_DATA)
                     .withDescription("data_save_mode");
+
+    public static final Option<List<String>> VECTORIZATION_FIELDS =
+            Options.key("vectorization_fields")
+                    .listType(String.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "List of field names that contain embedding 
vectors (ByteBuffer)");
+
+    public static final Option<Integer> VECTOR_DIMENSIONS =
+            Options.key("vector_dimensions")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "Default dimension for vector fields (number of 
floats in the vector)");
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index 18ab0ae812..20ba92c4f6 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.BufferUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
@@ -34,7 +35,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.In
 
 import lombok.NonNull;
 
+import java.nio.ByteBuffer;
 import java.time.temporal.Temporal;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,10 +53,23 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
     private final IndexTypeSerializer indexTypeSerializer;
     private final Function<SeaTunnelRow, String> keyExtractor;
 
+    // Configuration for vectorization fields
+    private final List<String> vectorizationFields;
+    private final int vectorDimension;
+
     public ElasticsearchRowSerializer(
             ElasticsearchClusterInfo elasticsearchClusterInfo,
             IndexInfo indexInfo,
             SeaTunnelRowType seaTunnelRowType) {
+        this(elasticsearchClusterInfo, indexInfo, seaTunnelRowType, 
Collections.emptyList(), 0);
+    }
+
+    public ElasticsearchRowSerializer(
+            ElasticsearchClusterInfo elasticsearchClusterInfo,
+            IndexInfo indexInfo,
+            SeaTunnelRowType seaTunnelRowType,
+            List<String> vectorizationFields,
+            int vectorDimension) {
         this.indexTypeSerializer =
                 IndexTypeSerializerFactory.getIndexTypeSerializer(
                         elasticsearchClusterInfo, indexInfo.getType());
@@ -63,6 +79,8 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
         this.keyExtractor =
                 KeyExtractor.createKeyExtractor(
                         seaTunnelRowType, indexInfo.getPrimaryKeys(), 
indexInfo.getKeyDelimiter());
+        this.vectorizationFields = vectorizationFields;
+        this.vectorDimension = vectorDimension;
     }
 
     @Override
@@ -176,26 +194,53 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
                         toDocumentMap(
                                 (SeaTunnelRow) value, (SeaTunnelRowType) 
rowType.getFieldType(i)));
             } else {
-                doc.put(fieldNames[i], convertValue(value));
+                doc.put(fieldNames[i], convertValue(fieldNames[i], value));
             }
         }
         return doc;
     }
 
-    private Object convertValue(Object value) {
+    private Object convertValue(String fieldName, Object value) {
         if (value instanceof Temporal) {
             // jackson not support jdk8 new time api
             return value.toString();
         } else if (value instanceof Map) {
             for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
-                ((Map) value).put(entry.getKey(), 
convertValue(entry.getValue()));
+                ((Map) value).put(entry.getKey(), convertValue(fieldName, 
entry.getValue()));
             }
             return value;
         } else if (value instanceof List) {
             for (int i = 0; i < ((List) value).size(); i++) {
-                ((List) value).set(i, convertValue(((List) value).get(i)));
+                ((List) value).set(i, convertValue(fieldName, ((List) 
value).get(i)));
             }
             return value;
+        } else if (value instanceof ByteBuffer) {
+            // Check if this field is configured as a vectorization field
+            if (vectorizationFields != null && 
vectorizationFields.contains(fieldName)) {
+                ByteBuffer buffer = (ByteBuffer) value;
+                Float[] floats = BufferUtils.toFloatArray(buffer);
+
+                // Use the configured dimension or calculate it from the 
buffer size
+                int dimension = vectorDimension > 0 ? vectorDimension : 
buffer.remaining() / 4;
+
+                // Read the floats from the buffer
+                for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) 
{
+                    floats[i] = buffer.getFloat();
+                }
+
+                return floats;
+            } else {
+                // Default behavior for ByteBuffer fields not specified as 
vectorization fields
+                ByteBuffer buffer = (ByteBuffer) value;
+                Float[] floats = BufferUtils.toFloatArray(buffer);
+                int floatCount = buffer.remaining() / 4;
+
+                for (int i = 0; i < floatCount; i++) {
+                    floats[i] = buffer.getFloat();
+                }
+
+                return floats;
+            }
         } else {
             return value;
         }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index b5da93d37b..3b897de6ea 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -40,6 +40,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
@@ -53,6 +54,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
@@ -77,6 +79,7 @@ public class ElasticsearchSinkWriter
     private final IndexInfo indexInfo;
     private TableSchema tableSchema;
     private final TableSchemaChangeEventHandler tableSchemaChangeEventHandler;
+    private final ReadonlyConfig config;
 
     public ElasticsearchSinkWriter(
             Context context,
@@ -86,15 +89,25 @@ public class ElasticsearchSinkWriter
             int maxRetryCount) {
         this.context = context;
         this.maxBatchSize = maxBatchSize;
+        this.config = config;
 
         this.indexInfo =
                 new 
IndexInfo(catalogTable.getTableId().getTableName().toLowerCase(), config);
         esRestClient = EsRestClient.createInstance(config);
+
+        // Get vectorization fields and dimension from config
+        List<String> vectorizationFields =
+                
config.getOptional(ElasticsearchSinkOptions.VECTORIZATION_FIELDS)
+                        .orElse(Collections.emptyList());
+        int vectorDimension = 
config.get(ElasticsearchSinkOptions.VECTOR_DIMENSIONS);
+
         this.seaTunnelRowSerializer =
                 new ElasticsearchRowSerializer(
                         esRestClient.getClusterInfo(),
                         indexInfo,
-                        catalogTable.getSeaTunnelRowType());
+                        catalogTable.getSeaTunnelRowType(),
+                        vectorizationFields,
+                        vectorDimension);
 
         this.requestEsList = new ArrayList<>(maxBatchSize);
         this.retryMaterial =
@@ -129,11 +142,20 @@ public class ElasticsearchSinkWriter
         }
 
         this.tableSchema = 
tableSchemaChangeEventHandler.reset(tableSchema).apply(event);
+
+        // Get vectorization fields and dimension from config
+        List<String> vectorizationFields =
+                
config.getOptional(ElasticsearchSinkOptions.VECTORIZATION_FIELDS)
+                        .orElse(Collections.emptyList());
+        int vectorDimension = 
config.get(ElasticsearchSinkOptions.VECTOR_DIMENSIONS);
+
         this.seaTunnelRowSerializer =
                 new ElasticsearchRowSerializer(
                         esRestClient.getClusterInfo(),
                         indexInfo,
-                        tableSchema.toPhysicalRowDataType());
+                        tableSchema.toPhysicalRowDataType(),
+                        vectorizationFields,
+                        vectorDimension);
     }
 
     private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 37de01a474..1831496e5b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -37,7 +37,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import org.apache.commons.io.IOUtils;
@@ -316,6 +318,49 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK not support adapt")
+    public void testElasticsearchWithVector(TestContainer container)
+            throws IOException, InterruptedException {
+        String mapping =
+                "{\n"
+                        + "  \"mappings\": {\n"
+                        + "    \"properties\": {\n"
+                        + "      \"review_id\": {\"type\": \"long\"},\n"
+                        + "      \"review_embedding\": {\n"
+                        + "        \"type\": \"dense_vector\",\n"
+                        + "        \"dims\": 1024\n"
+                        + "      },\n"
+                        + "      \"review_text\": {\"type\": \"text\"},\n"
+                        + "      \"review_score\": {\"type\": \"float\"}\n"
+                        + "    }\n"
+                        + "  }\n"
+                        + "}";
+
+        // create index
+        esRestClient.createIndex("vector_test", mapping);
+        Thread.sleep(INDEX_REFRESH_MILL_DELAY);
+
+        Container.ExecResult execResult =
+                
container.executeJob("/elasticsearch/fake-to-elasticsearch-vector.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // Wait for index refresh
+        Thread.sleep(INDEX_REFRESH_MILL_DELAY);
+
+        // Verify that 10 documents were inserted as specified in the config
+        Assertions.assertEquals(
+                10, 
esRestClient.getIndexDocsCount("vector_test").get(0).getDocsCount());
+
+        // Verify vector field exists in the mapping
+        Map<String, BasicTypeDefine<EsType>> fieldTypes =
+                esRestClient.getFieldTypeMapping("vector_test", 
Collections.emptyList());
+        Assertions.assertTrue(fieldTypes.containsKey("review_embedding"));
+    }
+
     @TestTemplate
     public void testElasticsearchWithPIT(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fake-to-elasticsearch-vector.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fake-to-elasticsearch-vector.conf
new file mode 100644
index 0000000000..7f6a78c00d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fake-to-elasticsearch-vector.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+      row.num = 10
+      vector.dimension = 1024
+      schema = {
+           table = "vector_test"
+           columns = [
+           {
+              name = review_id
+              type = bigint
+              nullable = false
+              defaultValue = 0
+              comment = "primary key id"
+           },
+           {
+              name = review_embedding
+              type = float_vector
+              columnScale = 1024
+              comment = "vector embedding"
+           },
+           {
+              name = review_text
+              type = string
+              nullable = true
+              comment = "review content"
+           },
+           {
+              name = review_score
+              type = float
+              nullable = true
+              comment = "review score"
+           }
+       ]
+        primaryKey {
+            name = review_id
+            columnNames = [review_id]
+        }
+      }
+  }
+}
+
+sink {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+    
+    index = "${table_name}"
+    schema_save_mode = "IGNORE"
+    data_save_mode = "APPEND_DATA"
+    
+    # Vector configuration
+    vectorization_fields = ["review_embedding"]
+    vector_dimensions = 1024
+  }
+}

Reply via email to