ruanhang1993 commented on code in PR #3753:
URL: https://github.com/apache/flink-cdc/pull/3753#discussion_r1855707925
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -323,4 +335,27 @@ private Configuration toPipelineConfig(JsonNode
pipelineConfigNode) {
pipelineConfigNode, new TypeReference<Map<String,
String>>() {});
return Configuration.fromMap(pipelineConfigMap);
}
+
+ private List<ModelDef> parseModels(JsonNode modelsNode) {
+ List<ModelDef> modelDefs = new ArrayList<>();
+ if (modelsNode != null && modelsNode.isArray()) {
Review Comment:
Should we throw an exception when the modelsNode is not an array?
##########
docs/content/docs/core-concept/transform.md:
##########
@@ -356,6 +356,55 @@ transform:
filter: inc(id) < 100
```
+## Embedding AI Model
+
+Embedding AI Model can be used in transform rules.
+
+How to define a Embedding AI Model:
+
+```yaml
+pipeline:
+ models:
+ - name: GET_EMBEDDING
+ model: OpenAIEmbeddingModel
+ OpenAI.host: https://xxxx
+ OpenAI.apiKey: abcd1234
+```
+Note:
+* `name` is a required parameter, which represent the function name called in
`projection` or `filter`.
+* `model` is a required parameter, available values can be found in [All
Support models](#all-support-models).
+* `OpenAI.host` and `OpenAI.apiKey` is option parameters that defined in
specific model.
+
+How to use a Embedding AI Model:
+
+```yaml
+transform:
+ - source-table: db.\.*
+ projection: "*, inc(inc(inc(id))) as inc_id, GET_EMBEDDING(page) as
summary"
+ filter: inc(id) < 100
+```
+
+### All Support models
+
+The following built-in models are provided:
+
+#### OpenAIChatModel
Review Comment:
Please add examples for both two models.
##########
flink-cdc-runtime/pom.xml:
##########
@@ -26,6 +26,9 @@ limitations under the License.
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-cdc-runtime</artifactId>
+ <properties>
+ <langchain4j.version>0.23.0</langchain4j.version>
Review Comment:
Add some blank spaces.
##########
docs/content/docs/core-concept/transform.md:
##########
@@ -356,6 +356,55 @@ transform:
filter: inc(id) < 100
```
+## Embedding AI Model
+
+Embedding AI Model can be used in transform rules.
+
+How to define a Embedding AI Model:
+
+```yaml
+pipeline:
+ models:
+ - name: GET_EMBEDDING
+ model: OpenAIEmbeddingModel
+ OpenAI.host: https://xxxx
+ OpenAI.apiKey: abcd1234
+```
+Note:
+* `name` is a required parameter, which represent the function name called in
`projection` or `filter`.
+* `model` is a required parameter, available values can be found in [All
Support models](#all-support-models).
+* `OpenAI.host` and `OpenAI.apiKey` is option parameters that defined in
specific model.
+
+How to use a Embedding AI Model:
+
+```yaml
+transform:
+ - source-table: db.\.*
+ projection: "*, inc(inc(inc(id))) as inc_id, GET_EMBEDDING(page) as
summary"
+ filter: inc(id) < 100
+```
+
+### All Support models
+
+The following built-in models are provided:
+
+#### OpenAIChatModel
+
+| parameter | type | optional/required | meaning
|
+|---------------------|--------|-------------------|-----------------------------------------------|
+| OpenAI.model-name | STRING | required | Name of model to be
called. |
+| OpenAI.host | STRING | required | Host of the Model server
to be connected. |
+| OpenAI.apiKey | STRING | required | Api Key for verification
of the Model server. |
+| OpenAI.chat.promote | STRING | optional | Promote for chatting with
OpenAI. |
+
+#### OpenAIEmbeddingModel
+
+| parameter | type | optional/required | meaning
|
+|---------------------|--------|-------------------|-----------------------------------------------|
+| OpenAI.model-name | STRING | required | Name of model to be
called. |
Review Comment:
The parameter `OpenAI.model-name` seems to be required in both models. But
it is not used in the previous example. Is it a required parameter or not ?
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -85,9 +99,27 @@ public DataStream<Event> translatePostTransform(
postTransformFunctionBuilder.addTimezone(timezone);
postTransformFunctionBuilder.addUdfFunctions(
udfFunctions.stream()
- .map(udf -> Tuple2.of(udf.getName(),
udf.getClasspath()))
+ .map(this::udfDefToNameAndClasspathTuple)
+ .collect(Collectors.toList()));
+ postTransformFunctionBuilder.addUdfFunctions(
+ models.stream()
+ .map(this::modelToNameAndClasspathTuple)
.collect(Collectors.toList()));
return input.transform(
"Transform:Data", new EventTypeInfo(),
postTransformFunctionBuilder.build());
}
+
+ private Tuple2<String, String> modelToNameAndClasspathTuple(ModelDef
model) {
Review Comment:
The name of this method should be adjusted. There is not a classpath in the
`ModelDef`.
##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/data/binary/BinarySegmentUtilsTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.data.binary;
+
+import org.apache.flink.cdc.common.data.util.BinaryRecordDataDataUtil;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.junit.Test;
+
+import static
org.apache.flink.cdc.common.data.util.BinaryRecordDataDataUtil.BYTE_ARRAY_BASE_OFFSET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Utilities for binary data segments which heavily uses {@link
MemorySegment}. */
+public class BinarySegmentUtilsTest {
+ @Test
+ public void testCopy() {
+ // test copy the content of the latter Seg
+ MemorySegment[] segments = new MemorySegment[2];
+ segments[0] = MemorySegmentFactory.wrap(new byte[] {0, 2, 5});
+ segments[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15});
+
+ byte[] bytes = BinarySegmentUtils.copyToBytes(segments, 4, 2);
+ assertThat(bytes).isEqualTo(new byte[] {12, 15});
+ }
+
+ @Test
+ public void testEquals() {
+ // test copy the content of the latter Seg
+ MemorySegment[] segments1 = new MemorySegment[3];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[] {0, 2, 5});
+ segments1[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15});
+ segments1[2] = MemorySegmentFactory.wrap(new byte[] {1, 1, 1});
+
+ MemorySegment[] segments2 = new MemorySegment[2];
+ segments2[0] = MemorySegmentFactory.wrap(new byte[] {6, 0, 2, 5});
+ segments2[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15, 18});
+
+ assertThat(BinarySegmentUtils.equalsMultiSegments(segments1, 0,
segments2, 0, 0)).isTrue();
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1,
3)).isTrue();
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1,
6)).isTrue();
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1,
7)).isFalse();
+ }
+
+ @Test
+ public void testBoundaryByteArrayEquals() {
+ byte[] bytes1 = new byte[5];
+ bytes1[3] = 81;
+ byte[] bytes2 = new byte[100];
+ bytes2[3] = 81;
+ bytes2[4] = 81;
+
+ assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2,
4)).isTrue();
+ assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2,
5)).isFalse();
+ assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2,
0)).isTrue();
+ }
+
+ // @Test
+ // public void testBoundaryEquals() {
+ // BinaryRecordData row24 =
DataFormatTestUtil.get24BytesBinaryRow();
+ // BinaryRecordData row160 =
DataFormatTestUtil.get160BytesBinaryRow();
+ // BinaryRecordData varRow160 =
DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160);
+ // BinaryRecordData varRow160InOne =
+ // DataFormatTestUtil.getMultiSeg160BytesInOneSegRow(row160);
+ //
+ // assertThat(varRow160InOne).isEqualTo(row160);
+ // assertThat(varRow160InOne).isEqualTo(varRow160);
+ // assertThat(varRow160).isEqualTo(row160);
+ // assertThat(varRow160).isEqualTo(varRow160InOne);
+ //
+ // assertThat(row160).isNotEqualTo(row24);
+ // assertThat(varRow160).isNotEqualTo(row24);
+ // assertThat(varRow160InOne).isNotEqualTo(row24);
+ //
+ // assertThat(BinarySegmentUtils.equals(row24.getSegments(), 0,
row160.getSegments(), 0,
+ // 0))
+ // .isTrue();
+ // assertThat(BinarySegmentUtils.equals(row24.getSegments(), 0,
varRow160.getSegments(),
+ // 0, 0))
+ // .isTrue();
+ //
+ // // test var segs
+ // MemorySegment[] segments1 = new MemorySegment[2];
+ // segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ // segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ // MemorySegment[] segments2 = new MemorySegment[3];
+ // segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+ // segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+ // segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+ //
+ // segments1[0].put(9, (byte) 1);
+ // assertThat(BinarySegmentUtils.equals(segments1, 0, segments2,
14, 14)).isFalse();
+ // segments2[1].put(7, (byte) 1);
+ // assertThat(BinarySegmentUtils.equals(segments1, 0, segments2,
14, 14)).isTrue();
+ // assertThat(BinarySegmentUtils.equals(segments1, 2, segments2,
16, 14)).isTrue();
+ // assertThat(BinarySegmentUtils.equals(segments1, 2, segments2,
16, 16)).isTrue();
+ //
+ // segments2[2].put(7, (byte) 1);
+ // assertThat(BinarySegmentUtils.equals(segments1, 2, segments2,
32, 14)).isTrue();
+ // }
Review Comment:
Delete the useless test.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIChatModel.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import dev.langchain4j.data.message.UserMessage;
+import dev.langchain4j.model.openai.OpenAiChatModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_API_KEY;
+import static
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_CHAT_PROMOTE;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_HOST;
+import static
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_MODEL_NAME;
+
+/**
+ * A {@link BuiltInModel} that use Model defined by OpenAI to generate text,
refer to <a
+ *
href="https://docs.langchain4j.dev/integrations/language-models/open-ai/">docs</a>}.
+ */
+public class OpenAIChatModel implements BuiltInModel, UserDefinedFunction {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OpenAIChatModel.class);
+
+ private OpenAiChatModel chatModel;
+
+ private String host;
+
+ private String apiKey;
+
+ private String modelName;
+
+ private String promote;
+
+ public void configure(Configuration modelOptions) {
Review Comment:
Add some checks for the required options.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIEmbeddingModel.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import dev.langchain4j.data.document.Metadata;
+import dev.langchain4j.data.embedding.Embedding;
+import dev.langchain4j.data.segment.TextSegment;
+import dev.langchain4j.model.openai.OpenAiEmbeddingModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_API_KEY;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_HOST;
+import static
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_MODEL_NAME;
+
+/**
+ * A {@link BuiltInModel} that use Model defined by OpenAI to generate vector
data, refer to <a
+ *
href="https://docs.langchain4j.dev/integrations/language-models/open-ai/">docs</a>}.
+ */
+public class OpenAIEmbeddingModel implements BuiltInModel, UserDefinedFunction
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OpenAIEmbeddingModel.class);
+
+ private String host;
+
+ private String apiKey;
+
+ private String modelName;
+
+ private OpenAiEmbeddingModel embeddingModel;
+
+ public void configure(Configuration modelOptions) {
+ this.modelName = modelOptions.get(OPENAI_MODEL_NAME);
+ if (modelName == null) {
+ modelName = "text-embedding-ada-002";
+ }
+ this.host = modelOptions.get(OPENAI_HOST);
+ this.apiKey = modelOptions.get(OPENAI_API_KEY);
Review Comment:
Add some checks for the required options.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -539,6 +541,10 @@ private void initializeUdf() {
// into UserDefinedFunction interface, thus the
provided UDF classes
// might not be compatible with the interface
definition in CDC common.
Object udfInstance =
udfFunctionInstances.get(udf.getName());
+ if (udfInstance instanceof BuiltInModel) {
+ ((BuiltInModel) udfInstance)
+
.configure(Configuration.fromMap(udf.getParameters()));
Review Comment:
Supporting configuration is also a feature for udf. The method `configure`
should be in the interface `UserDefinedFunction` instead of `BuiltInModel`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]