yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799076504
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -43,7 +28,6 @@
import static
org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
-/** Parser for converting YAML formatted pipeline definition to {@link
PipelineDef}. */
Review Comment:
Ditto.
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -287,9 +291,27 @@ private Configuration toPipelineConfig(JsonNode
pipelineConfigNode) {
if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
return new Configuration();
}
- Map<String, String> pipelineConfigMap =
- mapper.convertValue(
- pipelineConfigNode, new TypeReference<Map<String,
String>>() {});
+ Map<String, String> pipelineConfigMap = new HashMap<>();
+ pipelineConfigNode
+ .fields()
+ .forEachRemaining(
+ entry -> {
+ String key = entry.getKey();
+ JsonNode value = entry.getValue();
+ if (!key.equals(MODEL_KEY)) {
+ pipelineConfigMap.put(key, value.asText());
+ }
+ });
return Configuration.fromMap(pipelineConfigMap);
}
+
+ private List<ModelDef> parseModels(JsonNode modelsNode) {
+ List<ModelDef> modelDefs = new ArrayList<>();
+ if (modelsNode != null && modelsNode.isArray()) {
Review Comment:
Maybe we can throw an exception if `modelsNode` isn't a list. Or such code
may silently fail:
```yaml
pipeline:
models:
name: ...
```
While one may want to write this:
```yaml
pipeline:
models:
- name: ...
```
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -287,9 +291,27 @@ private Configuration toPipelineConfig(JsonNode
pipelineConfigNode) {
if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
return new Configuration();
}
- Map<String, String> pipelineConfigMap =
- mapper.convertValue(
- pipelineConfigNode, new TypeReference<Map<String,
String>>() {});
+ Map<String, String> pipelineConfigMap = new HashMap<>();
+ pipelineConfigNode
+ .fields()
+ .forEachRemaining(
+ entry -> {
+ String key = entry.getKey();
+ JsonNode value = entry.getValue();
+ if (!key.equals(MODEL_KEY)) {
+ pipelineConfigMap.put(key, value.asText());
+ }
+ });
Review Comment:
Models configuration could be parsed and removed in
`YamlPipelineDefinitionParser#parse` method in advance, so there's no need to
iterate configuration maps again here.
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java:
##########
Review Comment:
I doubt if it's the best way to parameterize UDFs. Maybe we can left
`UdfDef` unchanged, and let `ModelDef` to extend` UdfDef` classes, and put
extra parameters there? Thus, we won't need any of these string split hack and
serialization chore.
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java:
##########
@@ -0,0 +1,92 @@
+package org.apache.flink.cdc.composer.definition;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.Objects;
+
+public class ModelDef {
+ private final String name;
+ private final String host;
+ private final String apiKey;
+
+ public ModelDef(String name, String host, String apiKey) {
+ this.name = name;
+ this.host = host;
+ this.apiKey = apiKey;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getApiKey() {
+ return apiKey;
+ }
+
+ // 创建一个表示这个模型的 UDF
+ public ScalarFunction createUdf() {
+ return new ModelUdf(this);
+ }
+
+ // 内部类,代表这个模型的 UDF
+ public class ModelUdf extends ScalarFunction {
+ private final ModelDef model;
+
+ public ModelUdf(ModelDef model) {
+ this.model = model;
+ }
+
+ // UDF 的主要方法,处理输入并返回结果
+ public String eval(String input) {
+ // 这里实现调用模型 API 的逻辑
+ // 使用 model.getHost() 和 model.getApiKey() 来访问 API
+ // 这只是一个示例实现,实际逻辑需要根据具体的 API 调用方式来编写
+ return "Embedding for: " + input;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ // 初始化逻辑,如建立API连接等
+ }
+
+ @Override
+ public void close() throws Exception {
+ // 清理逻辑,如关闭API连接等
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
Review Comment:
Omitting function body braces violates code style check rule.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -77,6 +80,53 @@ public static int currentDate(long epochTime, String
timezone) {
return timestampMillisToDate(localtimestamp(epochTime,
timezone).getMillisecond());
}
+ private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002";
+ private static OpenAiEmbeddingModel embeddingModel;
+
+ public static void initializeOpenAiEmbeddingModel(String apiKey, String
baseUrl) {
+ embeddingModel =
+ OpenAiEmbeddingModel.builder()
+ .apiKey(apiKey)
+ .baseUrl(baseUrl)
+ .modelName(DEFAULT_MODEL_NAME)
+ .timeout(Duration.ofSeconds(30))
+ .maxRetries(3)
+ .build();
+ }
+
+ public static String getEmbedding(String input, String apiKey, String
model) {
+ if (input == null || input.trim().isEmpty()) {
+ LOG.debug("Empty or null input provided for embedding.");
+ return "";
+ }
+
+ try {
+ // 确保 OpenAiEmbeddingModel 已初始化
+ if (embeddingModel == null) {
+ initializeOpenAiEmbeddingModel(apiKey,
"https://api.openai.com/v1/");
Review Comment:
Is the endpoint hard-encoded here? Why we still need this function and
passing apiKeys manually? Should these be configured in `models:` rule block?
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java:
##########
@@ -0,0 +1,92 @@
+package org.apache.flink.cdc.composer.definition;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.Objects;
+
+public class ModelDef {
+ private final String name;
+ private final String host;
+ private final String apiKey;
+
+ public ModelDef(String name, String host, String apiKey) {
+ this.name = name;
+ this.host = host;
+ this.apiKey = apiKey;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getApiKey() {
+ return apiKey;
+ }
+
+ // 创建一个表示这个模型的 UDF
+ public ScalarFunction createUdf() {
+ return new ModelUdf(this);
+ }
+
+ // 内部类,代表这个模型的 UDF
+ public class ModelUdf extends ScalarFunction {
+ private final ModelDef model;
+
+ public ModelUdf(ModelDef model) {
+ this.model = model;
+ }
+
+ // UDF 的主要方法,处理输入并返回结果
+ public String eval(String input) {
+ // 这里实现调用模型 API 的逻辑
+ // 使用 model.getHost() 和 model.getApiKey() 来访问 API
+ // 这只是一个示例实现,实际逻辑需要根据具体的 API 调用方式来编写
+ return "Embedding for: " + input;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ // 初始化逻辑,如建立API连接等
+ }
+
+ @Override
+ public void close() throws Exception {
+ // 清理逻辑,如关闭API连接等
+ }
+ }
Review Comment:
What is this class for? Seems it's completely useless.
##########
pom.xml:
##########
@@ -41,6 +41,7 @@ limitations under the License.
<module>flink-cdc-runtime</module>
<module>flink-cdc-e2e-tests</module>
<module>flink-cdc-pipeline-udf-examples</module>
+ <module>flink-cdc-openai-udf</module>
Review Comment:
Maybe putting it as a part of `SystemFunctionUtils` is enough? Creating a
new top-level package seems too much.
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -52,6 +36,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
private static final String ROUTE_KEY = "route";
private static final String TRANSFORM_KEY = "transform";
private static final String PIPELINE_KEY = "pipeline";
+ private static final String MODEL_KEY = "models";
Review Comment:
`route`, `transform` and `user-defined-function` are not in plural form.
Maybe use `model` for consistency?
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -99,64 +86,61 @@ public PipelineDef parse(String pipelineDefText,
Configuration globalPipelineCon
private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration
globalPipelineConfig)
throws Exception {
- // Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline
definition",
SOURCE_KEY));
- // Sink is required
SinkDef sinkDef =
toSinkDef(
checkNotNull(
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline
definition",
SINK_KEY));
- // Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
.ifPresent(
node ->
node.forEach(
transform ->
transformDefs.add(toTransformDef(transform))));
- // Routes are optional
List<RouteDef> routeDefs = new ArrayList<>();
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route ->
routeDefs.add(toRouteDef(route))));
- // UDFs are optional
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode)
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf ->
udfDefs.add(toUdfDef(udf))));
- // Pipeline configs are optional
+ List<ModelDef> modelDefs = new ArrayList<>();
+ JsonNode modelsNode =
pipelineDefJsonNode.get(PIPELINE_KEY).get(MODEL_KEY);
Review Comment:
```suggestion
JsonNode modelsNode =
pipelineDefJsonNode.get(PIPELINE_KEY).remove(MODEL_KEY);
```
Accessing and removing `MODEL_KEY` could be done here at once.
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java:
##########
@@ -0,0 +1,92 @@
+package org.apache.flink.cdc.composer.definition;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.Objects;
+
+public class ModelDef {
Review Comment:
JavaDocs for public classes is required. This class should be marked as
`@PublicEvolving`.
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -1,25 +1,9 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
Review Comment:
License headers, JavaDocs and comments should be kept.
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java:
##########
@@ -0,0 +1,92 @@
+package org.apache.flink.cdc.composer.definition;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.Objects;
+
+public class ModelDef {
+ private final String name;
+ private final String host;
+ private final String apiKey;
+
+ public ModelDef(String name, String host, String apiKey) {
+ this.name = name;
+ this.host = host;
+ this.apiKey = apiKey;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getApiKey() {
+ return apiKey;
+ }
+
+ // 创建一个表示这个模型的 UDF
Review Comment:
Do not write comments in Chinese.
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -103,7 +103,10 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
TransformTranslator transformTranslator = new TransformTranslator();
stream =
transformTranslator.translatePreTransform(
- stream, pipelineDef.getTransforms(),
pipelineDef.getUdfs());
+ stream,
+ pipelineDef.getTransforms(),
+ pipelineDef.getUdfs(),
+ pipelineDef.getModels()); // 添加 models 参数
Review Comment:
Ditto, don't use Chinese comments.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/test/resources/testcontainers.properties:
##########
Review Comment:
Is this change relevant to this PR?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -77,6 +80,53 @@ public static int currentDate(long epochTime, String
timezone) {
return timestampMillisToDate(localtimestamp(epochTime,
timezone).getMillisecond());
}
+ private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002";
+ private static OpenAiEmbeddingModel embeddingModel;
+
+ public static void initializeOpenAiEmbeddingModel(String apiKey, String
baseUrl) {
+ embeddingModel =
+ OpenAiEmbeddingModel.builder()
+ .apiKey(apiKey)
+ .baseUrl(baseUrl)
+ .modelName(DEFAULT_MODEL_NAME)
+ .timeout(Duration.ofSeconds(30))
+ .maxRetries(3)
+ .build();
+ }
+
+ public static String getEmbedding(String input, String apiKey, String
model) {
+ if (input == null || input.trim().isEmpty()) {
+ LOG.debug("Empty or null input provided for embedding.");
+ return "";
+ }
+
+ try {
+ // 确保 OpenAiEmbeddingModel 已初始化
Review Comment:
Remove Chinese comments.
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -85,11 +80,45 @@ public DataStream<Event> translatePostTransform(
}
}
postTransformFunctionBuilder.addTimezone(timezone);
+
+ List<UdfDef> allFunctions = new ArrayList<>(udfFunctions);
+ allFunctions.addAll(convertModelsToUdfs(models));
+
postTransformFunctionBuilder.addUdfFunctions(
- udfFunctions.stream()
- .map(udf -> Tuple2.of(udf.getName(),
udf.getClasspath()))
- .collect(Collectors.toList()));
+
allFunctions.stream().map(this::udfDefToTuple2).collect(Collectors.toList()));
return input.transform(
"Transform:Data", new EventTypeInfo(),
postTransformFunctionBuilder.build());
}
+
+ private List<UdfDef> convertModelsToUdfs(List<ModelDef> models) {
+ return
models.stream().map(this::modelToUdf).collect(Collectors.toList());
+ }
+
+ private UdfDef modelToUdf(ModelDef model) {
+ String udfName = model.getName();
+ String serializedParams = serializeModelParams(model);
+ return new UdfDef(udfName, MODEL_UDF_CLASSPATH, serializedParams);
+ }
+
+ private String serializeModelParams(ModelDef model) {
+ return String.format(
+ "{\"name\":\"%s\",\"host\":\"%s\",\"apiKey\":\"%s\"}",
+ model.getName(), model.getHost(), model.getApiKey());
+ }
+
+ private Tuple2<String, String> udfDefToTuple2(UdfDef udf) {
Review Comment:
This `Tuple2<String, String>` is just an internal expression, while `UdfDef`
is a public API. It makes no sense to modify `UdfDef` to fit Models into this
internal data structure.
Will it be better if we left `UdfDef` unchanged, and modify `Tuple2<String,
String>` to hold both UDF and Model functions?
##########
flink-cdc-openai-udf/src/main/resources/archetype-resources/src/main/java/App.java:
##########
Review Comment:
Irrelevant files should be excluded from this PR.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -30,16 +35,14 @@
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.TimeZone;
-import java.util.UUID;
+import java.util.*;
Review Comment:
Ditto.
##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PipelineDebugEntry.java:
##########
Review Comment:
Seems this file is for local debugging only, and will never work on any
other computer. Maybe you don't want to commit this?
##########
flink-cdc-openai-udf/src/main/java/org/apache/flink/cdc/udf/langchain4j/LangChain4jUDF.java:
##########
@@ -0,0 +1,116 @@
+package org.apache.flink.cdc.udf.langchain4j;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import dev.ai4j.openai4j.OpenAiClient;
+import dev.ai4j.openai4j.chat.ChatCompletionRequest;
+import dev.ai4j.openai4j.chat.ChatCompletionResponse;
+import dev.langchain4j.model.chat.ChatLanguageModel;
+import dev.langchain4j.model.openai.OpenAiChatModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+public class LangChain4jUDF implements UserDefinedFunction {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LangChain4jUDF.class);
+ private static final String API_KEY =
"sk-WegHEuogRpIyRSwaF5Ce6fE3E62e459dA61eFaF6CcF8C79b";
+ private static final String MODEL_NAME = "gpt-3.5-turbo";
+ private static final int TIMEOUT_SECONDS = 30;
+ private static final String BASE_URL = "https://api.xty.app/v1/";
Review Comment:
This is not a testing file. Why putting these hard-encoded configurations
here?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -22,6 +22,11 @@
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.utils.DateTimeUtils;
+import dev.langchain4j.data.document.Metadata;
+import dev.langchain4j.data.embedding.Embedding;
+import dev.langchain4j.data.segment.TextSegment;
+import dev.langchain4j.model.openai.OpenAiEmbeddingModel;
+import okhttp3.*;
Review Comment:
Don't use `*` import.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java:
##########
@@ -178,6 +178,40 @@ public void lookupOperatorOverloads(
OperandTypes.family(
SqlTypeFamily.STRING, SqlTypeFamily.STRING,
SqlTypeFamily.STRING),
SqlFunctionCategory.STRING);
+ public static final SqlFunction AI_CHAT_PREDICT =
+ new SqlFunction(
+ "AI_CHAT_PREDICT",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(SqlTypeName.VARCHAR), // Return type
is VARCHAR
+ null,
+ OperandTypes.family(
+ SqlTypeFamily.STRING, SqlTypeFamily.STRING,
SqlTypeFamily.STRING),
+ SqlFunctionCategory.USER_DEFINED_FUNCTION // Category of
the function
+ );
+
+ // Define the AI_EMBEDDING function
+ public static final SqlFunction GET_EMBEDDING =
+ new SqlFunction(
+ "GET_EMBEDDING",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(SqlTypeName.VARCHAR), // 返回类型是数组
Review Comment:
`SqlTypeName.VARCHAR), // 返回类型是数组`
This comment seems a little untrue. Also, after we have user-defined-models,
are these definitions still necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]