yuxiqian commented on code in PR #3753:
URL: https://github.com/apache/flink-cdc/pull/3753#discussion_r1855691583


##########
docs/content/docs/core-concept/transform.md:
##########
@@ -356,6 +356,55 @@ transform:
     filter: inc(id) < 100
 ```
 
+## Embedding AI Model
+
+Embedding AI Model can be used in transform rules.
+
+How to define a Embedding AI Model:
+
+```yaml
+pipeline:
+  models:

Review Comment:
   How about using `model` since other block definitions use singular form?



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.definition;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Common properties of model.
+ *
+ * <p>A transformation definition contains:
+ *
+ * <ul>
+ *   <li>name: The name of function.
+ *   <li>name: The model to transform data.
+ *   <li>properties: The parameters that used to configure the model.

Review Comment:
   ```suggestion
    *   <li>name: The name of function.
    *   <li>model: The model to transform data.
    *   <li>parameters: The parameters that used to configure the model.
   ```



##########
flink-cdc-runtime/pom.xml:
##########
@@ -26,6 +26,9 @@ limitations under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>flink-cdc-runtime</artifactId>
+    <properties>
+    <langchain4j.version>0.23.0</langchain4j.version>
+    </properties>

Review Comment:
   ```suggestion
       <properties>
           <langchain4j.version>0.23.0</langchain4j.version>
       </properties>
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorShardingTableITCase.java:
##########
@@ -196,7 +196,7 @@ public void testShardingTablesWithTinyInt1() throws 
Exception {
                 new String[] {
                     "+I[1, 1]", "+I[2, 0]", "+I[3, 1]", "+I[4, 0]", "+I[5, 
1]", "+I[6, 0]",
                 };
-        List<String> actual = TestValuesTableFactory.getResults("sink");
+        List<String> actual = 
TestValuesTableFactory.getResultsAsStrings("sink");

Review Comment:
   This should have been fixed in master. Please rebase this branch



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+/** Options of {@link BuiltInModel}. */
+public class ModelOptions {
+
+    // Options for Open AI Model.
+    public static final ConfigOption<String> OPENAI_MODEL_NAME =
+            ConfigOptions.key("OpenAI.model-name")

Review Comment:
   There are no other option keys in mixed-case. Can we use `openai.xxx` prefix 
here?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIChatModel.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import dev.langchain4j.data.message.UserMessage;
+import dev.langchain4j.model.openai.OpenAiChatModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_API_KEY;
+import static 
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_CHAT_PROMOTE;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_HOST;
+import static 
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_MODEL_NAME;
+
+/**
+ * A {@link BuiltInModel} that use Model defined by OpenAI to generate text, 
refer to <a
+ * 
href="https://docs.langchain4j.dev/integrations/language-models/open-ai/";>docs</a>}.
+ */
+public class OpenAIChatModel implements BuiltInModel, UserDefinedFunction {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OpenAIChatModel.class);
+
+    private OpenAiChatModel chatModel;
+
+    private String host;
+
+    private String apiKey;
+
+    private String modelName;
+
+    private String promote;
+
+    public void configure(Configuration modelOptions) {
+        this.modelName = modelOptions.get(OPENAI_MODEL_NAME);
+        this.host = modelOptions.get(OPENAI_HOST);
+        this.apiKey = modelOptions.get(OPENAI_API_KEY);
+        this.promote = modelOptions.get(OPENAI_CHAT_PROMOTE);
+    }
+
+    public String eval(String input) {
+        return chat(input);
+    }
+
+    private String chat(String input) {
+        if (input == null || input.trim().isEmpty()) {
+            LOG.warn("Empty or null input provided for embedding.");
+            return "";
+        }
+        if (promote != null) {
+            input = promote + ": " + input;
+        }
+        return chatModel
+                .generate(Collections.singletonList(new UserMessage(input)))
+                .content()
+                .text();
+    }
+
+    @Override
+    public DataType getReturnType() {
+        // This length is an empirical value.
+        return DataTypes.VARCHAR(65535);

Review Comment:
   What about `DataTypes.STRING()`? Limiting maximum length here seems 
arbitrary.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/BuiltInModel.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+/**
+ * Model that can be used to transform the record into text or vector. The 
lifecycle of
+ * BuiltInModel: configure => open => eval => close.
+ */

Review Comment:
   `BuiltInModel` interface itself should extend `UserDefinedFunction`, instead 
of requiring classes implementing `BuiltInModel` to do so.



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -323,4 +335,27 @@ private Configuration toPipelineConfig(JsonNode 
pipelineConfigNode) {
                         pipelineConfigNode, new TypeReference<Map<String, 
String>>() {});
         return Configuration.fromMap(pipelineConfigMap);
     }
+
+    private List<ModelDef> parseModels(JsonNode modelsNode) {
+        List<ModelDef> modelDefs = new ArrayList<>();
+        if (modelsNode != null && modelsNode.isArray()) {
+            for (JsonNode modelNode : modelsNode) {
+                String name =
+                        checkNotNull(
+                                        modelNode.get(MODEL_NAME_KEY),
+                                        "Missing required field \"%s\" in 
Model",
+                                        MODEL_NAME_KEY)
+                                .asText();
+                String model =
+                        checkNotNull(
+                                        modelNode.get(MODEL_MODEL_KEY),
+                                        "Missing required field \"%s\" in 
Model",
+                                        MODEL_MODEL_KEY)
+                                .asText();
+                Map<String, String> properties = 
mapper.convertValue(modelNode, Map.class);
+                modelDefs.add(new ModelDef(name, model, properties));
+            }
+        }

Review Comment:
   Throw an explicit exception if `modelsNode` is not array. Or such erroneous 
configuration will silently fail:
   
   ```yaml
   pipeline:
     models:
       name: GET_EMBEDDING
       model: OpenAIEmbeddingModel
       OpenAI.host: https://xxxx
       OpenAI.apiKey: abcd1234
   ```



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -85,9 +99,27 @@ public DataStream<Event> translatePostTransform(
         postTransformFunctionBuilder.addTimezone(timezone);
         postTransformFunctionBuilder.addUdfFunctions(
                 udfFunctions.stream()
-                        .map(udf -> Tuple2.of(udf.getName(), 
udf.getClasspath()))
+                        .map(this::udfDefToNameAndClasspathTuple)
+                        .collect(Collectors.toList()));
+        postTransformFunctionBuilder.addUdfFunctions(
+                models.stream()
+                        .map(this::modelToNameAndClasspathTuple)
                         .collect(Collectors.toList()));
         return input.transform(
                 "Transform:Data", new EventTypeInfo(), 
postTransformFunctionBuilder.build());
     }
+
+    private Tuple2<String, String> modelToNameAndClasspathTuple(ModelDef 
model) {
+        try {
+            // A tricky way to pass parameters to UDF
+            String serializedParams = 
objectMapper.writeValueAsString(model.getParameters());
+            return Tuple2.of(serializedParams, model.getModel());
+        } catch (Exception e) {
+            throw new IllegalArgumentException("ModelDef is illegal, ModelDef 
is " + model, e);
+        }
+    }

Review Comment:
   What about widening the data structure (`Tuple2<String, String>`) passed 
into Transform node? Such hacking here seems unnecessary since we don't need to 
guarantee background compatibility of internal APIs.



##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java:
##########
@@ -822,6 +827,77 @@ void testComplicatedFlinkUdf(ValuesDataSink.SinkApi 
sinkApi, String language) th
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 
4, Integer: 42, 2-42], after=[2, x, 4, Integer: 42, 2-42], op=UPDATE, 
meta=()}");
     }
 
+    @ParameterizedTest
+    @MethodSource("testParams")
+    void testTransformWithModel(ValuesDataSink.SinkApi sinkApi, String 
language) throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup transform
+        TransformDef transformDef =
+                new TransformDef(
+                        "default_namespace.default_schema.table1",
+                        "*, CHAT(col1) AS fmt",
+                        null,
+                        "col1",
+                        null,
+                        "key1=value1",
+                        "");
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(transformDef),
+                        new ArrayList<>(),
+                        Arrays.asList(
+                                new ModelDef(
+                                        "GET_EMBEDDING",
+                                        "OpenAIChatModel",
+                                        new LinkedHashMap<>(
+                                                ImmutableMap.<String, 
String>builder()
+                                                        .put("name", "CHAT")
+                                                        .put("model", 
"OpenAIChatModel")
+                                                        
.put("OpenAI.model-name", "gpt-4o-mini")
+                                                        .put(
+                                                                "OpenAI.host",
+                                                                
"http://langchain4j.dev/demo/openai/v1";)
+                                                        .put("OpenAI.apiKey", 
"demo")
+                                                        .build()))),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .contains(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`fmt` STRING}, primaryKeys=col1, 
options=({key1=value1})}");
+        // The result of transform by model is not fixed.
+        assertThat(outputEvents.length).isEqualTo(9);

Review Comment:
   Assertions could be chained together
   
   ```suggestion
           assertThat(outputEvents)
                   .contains("...")
                   .hasSize(9);
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIEmbeddingModel.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import dev.langchain4j.data.document.Metadata;
+import dev.langchain4j.data.embedding.Embedding;
+import dev.langchain4j.data.segment.TextSegment;
+import dev.langchain4j.model.openai.OpenAiEmbeddingModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_API_KEY;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_HOST;
+import static 
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_MODEL_NAME;
+
+/**
+ * A {@link BuiltInModel} that use Model defined by OpenAI to generate vector 
data, refer to <a
+ * 
href="https://docs.langchain4j.dev/integrations/language-models/open-ai/";>docs</a>}.
+ */
+public class OpenAIEmbeddingModel implements BuiltInModel, UserDefinedFunction 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OpenAIEmbeddingModel.class);
+
+    private String host;
+
+    private String apiKey;
+
+    private String modelName;
+
+    private OpenAiEmbeddingModel embeddingModel;
+
+    public void configure(Configuration modelOptions) {
+        this.modelName = modelOptions.get(OPENAI_MODEL_NAME);
+        if (modelName == null) {
+            modelName = "text-embedding-ada-002";
+        }
+        this.host = modelOptions.get(OPENAI_HOST);
+        this.apiKey = modelOptions.get(OPENAI_API_KEY);
+    }
+
+    public ArrayData eval(String input) {
+        return getEmbedding(input);
+    }
+
+    private ArrayData getEmbedding(String input) {
+        if (input == null || input.trim().isEmpty()) {
+            LOG.debug("Empty or null input provided for embedding.");
+            return new GenericArrayData(new Float[0]);
+        }
+
+        TextSegment textSegment = new TextSegment(input, new Metadata());
+
+        List<Embedding> embeddings =
+                
embeddingModel.embedAll(Collections.singletonList(textSegment)).content();
+
+        if (embeddings != null && !embeddings.isEmpty()) {
+            List<Float> embeddingList = embeddings.get(0).vectorAsList();
+            Float[] embeddingArray = embeddingList.toArray(new Float[0]);
+            return new GenericArrayData(embeddingArray);
+        } else {
+            LOG.warn("No embedding results returned for input: {}", input);
+            return new GenericArrayData(new Float[0]);
+        }
+    }
+
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.ARRAY(DataTypes.FLOAT());
+    }
+
+    @Override
+    public void open() throws Exception {
+        LOG.info("Opening ModelUdf: {}", modelName);
+        this.embeddingModel =
+                OpenAiEmbeddingModel.builder()
+                        .apiKey(apiKey)
+                        .baseUrl(host)
+                        .modelName(modelName)
+                        .build();
+    }

Review Comment:
   Ditto, if there's no `close()`, omit `open()` too to keep semantic 
consistency.



##########
flink-cdc-runtime/pom.xml:
##########
@@ -89,6 +92,21 @@ limitations under the License.
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>dev.langchain4j</groupId>
+            <artifactId>langchain4j</artifactId>
+            <version>${langchain4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>dev.langchain4j</groupId>
+            <artifactId>langchain4j-open-ai</artifactId>
+            <version>${langchain4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.theokanning.openai-gpt3-java</groupId>
+            <artifactId>service</artifactId>
+            <version>0.12.0</version>
+        </dependency>

Review Comment:
   These are heavy dependencies and are not necessary for those who don't use 
AI models in pipeline. Can we distribute AI model related functions in 
individual jar packages as they can be dynamically registered?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+/** Options of {@link BuiltInModel}. */
+public class ModelOptions {
+
+    // Options for Open AI Model.
+    public static final ConfigOption<String> OPENAI_MODEL_NAME =
+            ConfigOptions.key("OpenAI.model-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Name of model to be called.");
+
+    public static final ConfigOption<String> OPENAI_HOST =
+            ConfigOptions.key("OpenAI.host")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Host of the Model server to be 
connected.");
+
+    public static final ConfigOption<String> OPENAI_API_KEY =
+            ConfigOptions.key("OpenAI.apiKey")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Api Key for verification of the Model 
server.");
+
+    public static final ConfigOption<String> OPENAI_CHAT_PROMOTE =
+            ConfigOptions.key("OpenAI.chat.promote")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Promote for chat using OpenAI.");

Review Comment:
   Prompt?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIChatModel.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import dev.langchain4j.data.message.UserMessage;
+import dev.langchain4j.model.openai.OpenAiChatModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_API_KEY;
+import static 
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_CHAT_PROMOTE;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_HOST;
+import static 
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_MODEL_NAME;
+
+/**
+ * A {@link BuiltInModel} that use Model defined by OpenAI to generate text, 
refer to <a
+ * 
href="https://docs.langchain4j.dev/integrations/language-models/open-ai/";>docs</a>}.
+ */
+public class OpenAIChatModel implements BuiltInModel, UserDefinedFunction {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OpenAIChatModel.class);
+
+    private OpenAiChatModel chatModel;
+
+    private String host;
+
+    private String apiKey;
+
+    private String modelName;
+
+    private String promote;
+
+    public void configure(Configuration modelOptions) {
+        this.modelName = modelOptions.get(OPENAI_MODEL_NAME);
+        this.host = modelOptions.get(OPENAI_HOST);
+        this.apiKey = modelOptions.get(OPENAI_API_KEY);
+        this.promote = modelOptions.get(OPENAI_CHAT_PROMOTE);
+    }
+
+    public String eval(String input) {
+        return chat(input);
+    }
+
+    private String chat(String input) {
+        if (input == null || input.trim().isEmpty()) {
+            LOG.warn("Empty or null input provided for embedding.");
+            return "";
+        }
+        if (promote != null) {
+            input = promote + ": " + input;
+        }
+        return chatModel
+                .generate(Collections.singletonList(new UserMessage(input)))
+                .content()
+                .text();
+    }
+
+    @Override
+    public DataType getReturnType() {
+        // This length is an empirical value.
+        return DataTypes.VARCHAR(65535);
+    }
+
+    @Override
+    public void open() throws Exception {
+        LOG.info("Opening ModelUdf: {}", modelName);
+        this.chatModel =
+                
OpenAiChatModel.builder().apiKey(apiKey).baseUrl(host).modelName(modelName).build();
+    }

Review Comment:
   IIUC `OpenAiChatModel` is stateless and could be constructed during 
initialization without side-effects. This also explains why we don't have to 
release any resources in `close()` method.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##########
@@ -123,10 +125,22 @@ private static RelNode sqlToRel(
                 if (udf.getReturnTypeHint() != null) {
                     // This UDF has return type hint annotation
                     returnTypeInference =
-                            o ->
-                                    o.getTypeFactory()
-                                            .createSqlType(
-                                                    
convertCalciteType(udf.getReturnTypeHint()));
+                            o -> {
+                                RelDataTypeFactory typeFactory = 
o.getTypeFactory();
+                                DataType returnTypeHint = 
udf.getReturnTypeHint();
+
+                                if (returnTypeHint instanceof ArrayType) {
+                                    DataType elementTypeHint =
+                                            ((ArrayType) 
returnTypeHint).getElementType();
+                                    RelDataType elementType =
+                                            typeFactory.createSqlType(
+                                                    
convertCalciteType(elementTypeHint));
+                                    return 
typeFactory.createArrayType(elementType, -1);
+                                }

Review Comment:
   #3740 also fixes UDF return type hinting issue. Can we use something similar 
to
   
   ```java
   o -> convertCalciteType(o.getTypeFactory(), udf.getReturnTypeHint());
   ```
   
   and modify `convertCalciteType` to handle `ROW`, `ARRAY`, `MAP` types 
correctly? Putting complex cases in a lambda expression might harm readability.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIEmbeddingModel.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import dev.langchain4j.data.document.Metadata;
+import dev.langchain4j.data.embedding.Embedding;
+import dev.langchain4j.data.segment.TextSegment;
+import dev.langchain4j.model.openai.OpenAiEmbeddingModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_API_KEY;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_HOST;
+import static 
org.apache.flink.cdc.runtime.model.ModelOptions.OPENAI_MODEL_NAME;
+
+/**
+ * A {@link BuiltInModel} that use Model defined by OpenAI to generate vector 
data, refer to <a
+ * 
href="https://docs.langchain4j.dev/integrations/language-models/open-ai/";>docs</a>}.
+ */
+public class OpenAIEmbeddingModel implements BuiltInModel, UserDefinedFunction 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OpenAIEmbeddingModel.class);
+
+    private String host;
+
+    private String apiKey;
+
+    private String modelName;
+
+    private OpenAiEmbeddingModel embeddingModel;
+
+    public void configure(Configuration modelOptions) {
+        this.modelName = modelOptions.get(OPENAI_MODEL_NAME);
+        if (modelName == null) {
+            modelName = "text-embedding-ada-002";
+        }

Review Comment:
   This could be declared as `OPENAI_MODEL_NAME`'s default value.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptor.java:
##########
@@ -39,12 +44,34 @@ public class UserDefinedFunctionDescriptor implements 
Serializable {
     private final DataType returnTypeHint;
     private final boolean isCdcPipelineUdf;
 
+    private final Map<String, String> parameters;
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    /** Package of {@link org.apache.flink.cdc.runtime.model.BuiltInModel}. */
+    public static final String PREFIX_CLASSPATH_MODEL = 
"org.apache.flink.cdc.runtime.model.";
+
+    private static final String MODEL_NAME_KEY = "name";
+
     public UserDefinedFunctionDescriptor(String name, String classpath) {
-        this.name = name;
-        this.classpath = classpath;
-        this.className = classpath.substring(classpath.lastIndexOf('.') + 1);
+        if (classpath.contains(".")) {
+            this.parameters = new HashMap<>();
+            this.name = name;
+            this.className = classpath.substring(classpath.lastIndexOf('.') + 
1);
+            this.classpath = classpath;
+        } else {
+            // The UserDefinedFunction is a built-in Model.
+            try {
+                parameters = objectMapper.readValue(name, Map.class);
+            } catch (JsonProcessingException e) {
+                throw new IllegalArgumentException(e);
+            }
+            this.name = parameters.get(MODEL_NAME_KEY);
+            this.className = classpath;
+            this.classpath = PREFIX_CLASSPATH_MODEL + classpath;
+        }

Review Comment:
   This is unnecessary hackaround, we can adjust 
`UserDefinedFunctionDescriptor` to distinguish UDFs and Models more reliably.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to