This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a1ce97155f [Feature][Connectors-V2][Elasticsearch] Support vector
transformation sink (#9330)
a1ce97155f is described below
commit a1ce97155f1e69bbb301e7c6fe1b069e89056d3e
Author: SEZ <[email protected]>
AuthorDate: Mon May 26 10:56:22 2025 +0800
[Feature][Connectors-V2][Elasticsearch] Support vector transformation sink
(#9330)
---
docs/en/connector-v2/sink/Elasticsearch.md | 25 ++++++-
docs/zh/connector-v2/sink/Elasticsearch.md | 23 ++++++-
.../config/ElasticsearchSinkOptions.java | 14 ++++
.../serialize/ElasticsearchRowSerializer.java | 53 ++++++++++++--
.../sink/ElasticsearchSinkWriter.java | 26 ++++++-
.../connector/elasticsearch/ElasticsearchIT.java | 45 ++++++++++++
.../fake-to-elasticsearch-vector.conf | 80 ++++++++++++++++++++++
7 files changed, 257 insertions(+), 9 deletions(-)
diff --git a/docs/en/connector-v2/sink/Elasticsearch.md
b/docs/en/connector-v2/sink/Elasticsearch.md
index 362866db67..8188674fbe 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -21,7 +21,7 @@ Engine Supported
## Options
-| name | type | required | default value |
+| name | type | required | default value |
|-------------------------|---------|----------|------------------------------|
| hosts | array | yes | - |
| index | string | yes | - |
@@ -41,7 +41,8 @@ Engine Supported
| tls_truststore_path | string | no | - |
| tls_truststore_password | string | no | - |
| common-options | | no | - |
-
+| vectorization_fields | array | no | - |
+| vector_dimensions | int | no | - |
### hosts [array]
`Elasticsearch` cluster http address, the format is `host:port` , allowing
multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.
@@ -75,6 +76,12 @@ x-pack password
one bulk request max try size
+### vectorization_fields [array]
+fields to embeddings
+
+### vector_dimensions [int]
+embeddings dimensions
+
### max_batch_size [int]
batch bulk doc max size
@@ -150,6 +157,20 @@ sink {
}
```
+vector-field writing
+
+```conf
+sink {
+ Elasticsearch {
+ hosts = ["localhost:9200"]
+ index = "${table_name}"
+ schema_save_mode="IGNORE"
+ vectorization_fields = ["review_embedding"]
+ vector_dimensions = 1024
+ }
+}
+```
+
CDC(Change data capture) event
```conf
diff --git a/docs/zh/connector-v2/sink/Elasticsearch.md
b/docs/zh/connector-v2/sink/Elasticsearch.md
index 55479940c4..fd463e5a3b 100644
--- a/docs/zh/connector-v2/sink/Elasticsearch.md
+++ b/docs/zh/connector-v2/sink/Elasticsearch.md
@@ -21,7 +21,7 @@ import ChangeLog from
'../changelog/connector-elasticsearch.md';
## 选项
-| 名称 | 类型 | 是否必须 | 默认值 |
+| 名称 | 类型 | 是否必须 | 默认值 |
|-------------------------|---------|------|------------------------------|
| hosts | array | 是 | - |
| index | string | 是 | - |
@@ -41,6 +41,8 @@ import ChangeLog from
'../changelog/connector-elasticsearch.md';
| tls_truststore_path | string | 否 | - |
| tls_truststore_password | string | 否 | - |
| common-options | | 否 | - |
+| vectorization_fields | array | 否 | - |
+| vector_dimensions | int | 否 | - |
### hosts [array]
@@ -74,6 +76,12 @@ x-pack 密码
批次批量请求最大尝试大小
+### vectorization_fields [array]
+需要向量转换的字段名
+
+### vector_dimensions [int]
+向量维度
+
### max_batch_size [int]
批次批量文档最大大小
@@ -148,6 +156,19 @@ sink {
}
}
```
+向量转换
+
+```conf
+sink {
+ Elasticsearch {
+ hosts = ["localhost:9200"]
+ index = "${table_name}"
+ schema_save_mode="IGNORE"
+ vectorization_fields = ["review_embedding"]
+ vector_dimensions = 1024
+ }
+}
+```
变更数据捕获 (Change data capture) 事件
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
index 22ab5e0d51..0e8b0d92bf 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
@@ -76,4 +76,18 @@ public class ElasticsearchSinkOptions extends
ElasticsearchBaseOptions {
Arrays.asList(DROP_DATA, APPEND_DATA,
ERROR_WHEN_DATA_EXISTS))
.defaultValue(APPEND_DATA)
.withDescription("data_save_mode");
+
+ public static final Option<List<String>> VECTORIZATION_FIELDS =
+ Options.key("vectorization_fields")
+ .listType(String.class)
+ .noDefaultValue()
+ .withDescription(
+ "List of field names that contain embedding
vectors (ByteBuffer)");
+
+ public static final Option<Integer> VECTOR_DIMENSIONS =
+ Options.key("vector_dimensions")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "Default dimension for vector fields (number of
floats in the vector)");
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index 18ab0ae812..20ba92c4f6 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.BufferUtils;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
@@ -34,7 +35,9 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.In
import lombok.NonNull;
+import java.nio.ByteBuffer;
import java.time.temporal.Temporal;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -50,10 +53,23 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
private final IndexTypeSerializer indexTypeSerializer;
private final Function<SeaTunnelRow, String> keyExtractor;
+ // Configuration for vectorization fields
+ private final List<String> vectorizationFields;
+ private final int vectorDimension;
+
public ElasticsearchRowSerializer(
ElasticsearchClusterInfo elasticsearchClusterInfo,
IndexInfo indexInfo,
SeaTunnelRowType seaTunnelRowType) {
+ this(elasticsearchClusterInfo, indexInfo, seaTunnelRowType,
Collections.emptyList(), 0);
+ }
+
+ public ElasticsearchRowSerializer(
+ ElasticsearchClusterInfo elasticsearchClusterInfo,
+ IndexInfo indexInfo,
+ SeaTunnelRowType seaTunnelRowType,
+ List<String> vectorizationFields,
+ int vectorDimension) {
this.indexTypeSerializer =
IndexTypeSerializerFactory.getIndexTypeSerializer(
elasticsearchClusterInfo, indexInfo.getType());
@@ -63,6 +79,8 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
this.keyExtractor =
KeyExtractor.createKeyExtractor(
seaTunnelRowType, indexInfo.getPrimaryKeys(),
indexInfo.getKeyDelimiter());
+ this.vectorizationFields = vectorizationFields;
+ this.vectorDimension = vectorDimension;
}
@Override
@@ -176,26 +194,53 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
toDocumentMap(
(SeaTunnelRow) value, (SeaTunnelRowType)
rowType.getFieldType(i)));
} else {
- doc.put(fieldNames[i], convertValue(value));
+ doc.put(fieldNames[i], convertValue(fieldNames[i], value));
}
}
return doc;
}
- private Object convertValue(Object value) {
+ private Object convertValue(String fieldName, Object value) {
if (value instanceof Temporal) {
// jackson not support jdk8 new time api
return value.toString();
} else if (value instanceof Map) {
for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
- ((Map) value).put(entry.getKey(),
convertValue(entry.getValue()));
+ ((Map) value).put(entry.getKey(), convertValue(fieldName,
entry.getValue()));
}
return value;
} else if (value instanceof List) {
for (int i = 0; i < ((List) value).size(); i++) {
- ((List) value).set(i, convertValue(((List) value).get(i)));
+ ((List) value).set(i, convertValue(fieldName, ((List)
value).get(i)));
}
return value;
+ } else if (value instanceof ByteBuffer) {
+ // Check if this field is configured as a vectorization field
+ if (vectorizationFields != null &&
vectorizationFields.contains(fieldName)) {
+ ByteBuffer buffer = (ByteBuffer) value;
+ Float[] floats = BufferUtils.toFloatArray(buffer);
+
+ // Use the configured dimension or calculate it from the
buffer size
+ int dimension = vectorDimension > 0 ? vectorDimension :
buffer.remaining() / 4;
+
+ // Read the floats from the buffer
+ for (int i = 0; i < dimension && buffer.remaining() >= 4; i++)
{
+ floats[i] = buffer.getFloat();
+ }
+
+ return floats;
+ } else {
+ // Default behavior for ByteBuffer fields not specified as
vectorization fields
+ ByteBuffer buffer = (ByteBuffer) value;
+ Float[] floats = BufferUtils.toFloatArray(buffer);
+ int floatCount = buffer.remaining() / 4;
+
+ for (int i = 0; i < floatCount; i++) {
+ floats[i] = buffer.getFloat();
+ }
+
+ return floats;
+ }
} else {
return value;
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index b5da93d37b..3b897de6ea 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -40,6 +40,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
@@ -53,6 +54,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -77,6 +79,7 @@ public class ElasticsearchSinkWriter
private final IndexInfo indexInfo;
private TableSchema tableSchema;
private final TableSchemaChangeEventHandler tableSchemaChangeEventHandler;
+ private final ReadonlyConfig config;
public ElasticsearchSinkWriter(
Context context,
@@ -86,15 +89,25 @@ public class ElasticsearchSinkWriter
int maxRetryCount) {
this.context = context;
this.maxBatchSize = maxBatchSize;
+ this.config = config;
this.indexInfo =
new
IndexInfo(catalogTable.getTableId().getTableName().toLowerCase(), config);
esRestClient = EsRestClient.createInstance(config);
+
+ // Get vectorization fields and dimension from config
+ List<String> vectorizationFields =
+
config.getOptional(ElasticsearchSinkOptions.VECTORIZATION_FIELDS)
+ .orElse(Collections.emptyList());
+ int vectorDimension =
config.get(ElasticsearchSinkOptions.VECTOR_DIMENSIONS);
+
this.seaTunnelRowSerializer =
new ElasticsearchRowSerializer(
esRestClient.getClusterInfo(),
indexInfo,
- catalogTable.getSeaTunnelRowType());
+ catalogTable.getSeaTunnelRowType(),
+ vectorizationFields,
+ vectorDimension);
this.requestEsList = new ArrayList<>(maxBatchSize);
this.retryMaterial =
@@ -129,11 +142,20 @@ public class ElasticsearchSinkWriter
}
this.tableSchema =
tableSchemaChangeEventHandler.reset(tableSchema).apply(event);
+
+ // Get vectorization fields and dimension from config
+ List<String> vectorizationFields =
+
config.getOptional(ElasticsearchSinkOptions.VECTORIZATION_FIELDS)
+ .orElse(Collections.emptyList());
+ int vectorDimension =
config.get(ElasticsearchSinkOptions.VECTOR_DIMENSIONS);
+
this.seaTunnelRowSerializer =
new ElasticsearchRowSerializer(
esRestClient.getClusterInfo(),
indexInfo,
- tableSchema.toPhysicalRowDataType());
+ tableSchema.toPhysicalRowDataType(),
+ vectorizationFields,
+ vectorDimension);
}
private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 37de01a474..1831496e5b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -37,7 +37,9 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.commons.io.IOUtils;
@@ -316,6 +318,49 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK not support adapt")
+ public void testElasticsearchWithVector(TestContainer container)
+ throws IOException, InterruptedException {
+ String mapping =
+ "{\n"
+ + " \"mappings\": {\n"
+ + " \"properties\": {\n"
+ + " \"review_id\": {\"type\": \"long\"},\n"
+ + " \"review_embedding\": {\n"
+ + " \"type\": \"dense_vector\",\n"
+ + " \"dims\": 1024\n"
+ + " },\n"
+ + " \"review_text\": {\"type\": \"text\"},\n"
+ + " \"review_score\": {\"type\": \"float\"}\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+
+ // create index
+ esRestClient.createIndex("vector_test", mapping);
+ Thread.sleep(INDEX_REFRESH_MILL_DELAY);
+
+ Container.ExecResult execResult =
+
container.executeJob("/elasticsearch/fake-to-elasticsearch-vector.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ // Wait for index refresh
+ Thread.sleep(INDEX_REFRESH_MILL_DELAY);
+
+ // Verify that 10 documents were inserted as specified in the config
+ Assertions.assertEquals(
+ 10,
esRestClient.getIndexDocsCount("vector_test").get(0).getDocsCount());
+
+ // Verify vector field exists in the mapping
+ Map<String, BasicTypeDefine<EsType>> fieldTypes =
+ esRestClient.getFieldTypeMapping("vector_test",
Collections.emptyList());
+ Assertions.assertTrue(fieldTypes.containsKey("review_embedding"));
+ }
+
@TestTemplate
public void testElasticsearchWithPIT(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fake-to-elasticsearch-vector.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fake-to-elasticsearch-vector.conf
new file mode 100644
index 0000000000..7f6a78c00d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fake-to-elasticsearch-vector.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ vector.dimension = 1024
+ schema = {
+ table = "vector_test"
+ columns = [
+ {
+ name = review_id
+ type = bigint
+ nullable = false
+ defaultValue = 0
+ comment = "primary key id"
+ },
+ {
+ name = review_embedding
+ type = float_vector
+ columnScale = 1024
+ comment = "vector embedding"
+ },
+ {
+ name = review_text
+ type = string
+ nullable = true
+ comment = "review content"
+ },
+ {
+ name = review_score
+ type = float
+ nullable = true
+ comment = "review score"
+ }
+ ]
+ primaryKey {
+ name = review_id
+ columnNames = [review_id]
+ }
+ }
+ }
+}
+
+sink {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "${table_name}"
+ schema_save_mode = "IGNORE"
+ data_save_mode = "APPEND_DATA"
+
+ # Vector configuration
+ vectorization_fields = ["review_embedding"]
+ vector_dimensions = 1024
+ }
+}