Copilot commented on code in PR #4394: URL: https://github.com/apache/flink-cdc/pull/4394#discussion_r3241440307
########## flink-cdc-common/src/test/java/org/apache/flink/cdc/common/model/AiModelClientFactoryTest.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.model; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** Tests for the default {@link AiModelClientFactory#validate} method. */ +class AiModelClientFactoryTest { + + private static final String IDENTIFIER = "test-provider"; + private static final String MODEL_NAME = "my-model"; + + private static final class StubFactory implements AiModelClientFactory { + private final Set<String> required; + private final Set<String> optional; + + StubFactory(Set<String> required, Set<String> optional) { + this.required = required; + this.optional = optional; + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Set<String> requiredOptions() { + return required; + } + + @Override + public Set<String> optionalOptions() { + return optional; + } + + @Override + public AiModelClient createClient(ModelContext context) { + return new AiModelClient() {}; + } + } + + private static ModelContext contextWithOptions(Map<String, String> options) { + return new ModelContext() { + @Override + public String getModelName() { + return MODEL_NAME; + } + + @Override + public Map<String, String> getOptions() { + return options; + } + + @Override + public ClassLoader getClassLoader() { + return Thread.currentThread().getContextClassLoader(); + } + }; + } + + @Test + void testValidatePassesWithAllRequiredOptions() { + StubFactory factory = new StubFactory(Set.of("api-key", "endpoint"), Set.of("timeout")); + + Map<String, String> options = new HashMap<>(); + options.put("api-key", "sk-xxx"); + options.put("endpoint", "https://api.example.com"); + + // Should not throw + factory.validate(contextWithOptions(options)); + } + + @Test + void testValidatePassesWithRequiredAndOptionalOptions() { + StubFactory factory = new StubFactory(Set.of("api-key", "endpoint"), Set.of("timeout")); + + Map<String, String> options = new HashMap<>(); + options.put("api-key", "sk-xxx"); + options.put("endpoint", "https://api.example.com"); + options.put("timeout", "30000"); + + factory.validate(contextWithOptions(options)); + } + + @Test + void testValidateThrowsOnMissingRequiredOption() { + StubFactory factory = new StubFactory(Set.of("api-key", "endpoint"), Set.of("timeout")); + + // Missing "endpoint" + Map<String, String> options = new HashMap<>(); + options.put("api-key", "sk-xxx"); + + Assertions.assertThatThrownBy(() -> factory.validate(contextWithOptions(options))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Missing required options for model 'my-model' (type='test-provider'): [endpoint]"); + } + + @Test + void testValidateThrowsOnMultipleMissingRequiredOptions() { + StubFactory factory = new StubFactory(Set.of("api-key", "endpoint", "model"), Set.of()); + + // All required options missing + Assertions.assertThatThrownBy( + () -> factory.validate(contextWithOptions(Collections.emptyMap()))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Missing required options for model 'my-model' (type='test-provider'): [endpoint, api-key, model]"); + } + + @Test + void testValidateThrowsOnUnknownOption() { + StubFactory factory = new StubFactory(Set.of("api-key"), Set.of("timeout")); + + Map<String, String> options = new HashMap<>(); + options.put("api-key", "sk-xxx"); + options.put("bogus", "unexpected"); + + Assertions.assertThatThrownBy(() -> factory.validate(contextWithOptions(options))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Unknown options for model 'my-model' (type='test-provider'): [bogus]"); + } + + @Test + void testValidateThrowsOnMultipleUnknownOptions() { + StubFactory factory = new StubFactory(Set.of("api-key"), Set.of()); + + Map<String, String> options = new HashMap<>(); + options.put("api-key", "sk-xxx"); + options.put("foo", "a"); + options.put("bar", "b"); + + Assertions.assertThatThrownBy(() -> factory.validate(contextWithOptions(options))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Unknown options for model 'my-model' (type='test-provider'): [bar, foo]"); Review Comment: Tests assert exact error message strings that include the contents of a `Set` (e.g. `[endpoint, api-key, model]` and `[bar, foo]`). The `validate()` implementation in `AiModelClientFactory` collects missing/unknown keys into an unordered `HashSet` via `Collectors.toSet()`. The iteration order is not guaranteed, so these tests are flaky and may fail intermittently depending on JVM/hash seed. Either sort the keys before formatting in `validate()`, or use unordered assertions (e.g. parse the bracket contents and assertContainsExactlyInAnyOrder). ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java: ########## @@ -68,6 +85,11 @@ public static ExpressionEvaluator compileExpression( argumentClasses.add(Class.forName(udfFunction.getClasspath())); } + for (String paramName : modelClients.keySet()) { + argumentNames.add(paramName); + argumentClasses.add(AiModelClient.class); + } Review Comment: `COMPILED_EXPRESSION_CACHE` is a global static cache keyed by `TransformExpressionKey`, but the parameter list passed to Janino now varies by `modelClients` (each model name becomes a positional parameter). The legacy overload still compiles with `Collections.emptyMap()`. If the legacy overload is invoked first for a key that should bind model-client parameters (e.g. an expression that uses an AI function whose model name happens not to influence the key/expression text), the cached evaluator will be parameter-incompatible with later callers, causing argument-count or compile errors. At minimum, include the model client parameter names/types in the cache key, or remove the legacy overload. ########## flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/main/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClient.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.models.openai; + +import org.apache.flink.cdc.common.model.AiModelClient; +import org.apache.flink.cdc.common.model.abilities.SupportsEmbedding; +import org.apache.flink.cdc.common.model.abilities.SupportsTextGeneration; + +import com.openai.client.OpenAIClient; +import com.openai.client.okhttp.OpenAIOkHttpClient; +import com.openai.models.chat.completions.ChatCompletion; +import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.models.embeddings.CreateEmbeddingResponse; +import com.openai.models.embeddings.Embedding; +import com.openai.models.embeddings.EmbeddingCreateParams; + +import java.util.List; + +/** AI model client that connects to any OpenAI-compatible endpoint. */ +public class OpenAiCompatibleModelClient + implements AiModelClient, SupportsTextGeneration, SupportsEmbedding { + + private static final long serialVersionUID = 1L; + + private final String endpoint; + private final String apiKey; + private final String modelName; + + private transient OpenAIClient client; + + public OpenAiCompatibleModelClient(String endpoint, String apiKey, String modelName) { + this.endpoint = endpoint; + this.apiKey = apiKey; + this.modelName = modelName; + } Review Comment: `OpenAiCompatibleModelClient` holds non-transient `endpoint`, `apiKey`, and `modelName` fields and is serialized along with the operator (per the `AiModelClient extends Serializable` contract). The `apiKey` will end up in checkpoints, savepoints, JobManager metadata, and any logs that print the operator state — a security concern, especially for shared/managed Flink clusters. Consider redacting in `toString`, or document this clearly and recommend secret-store integration. At minimum, the field should never appear in log lines. ########## flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/main/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClientFactory.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.models.openai; + +import org.apache.flink.cdc.common.model.AiModelClient; +import org.apache.flink.cdc.common.model.AiModelClientFactory; +import org.apache.flink.cdc.common.model.ModelContext; + +import java.util.Set; + +/** SPI factory for {@link OpenAiCompatibleModelClient}. */ +public class OpenAiCompatibleModelClientFactory implements AiModelClientFactory { + + @Override + public String identifier() { + return "openai-compatible"; + } + + @Override + public Set<String> requiredOptions() { + return Set.of("endpoint", "api-key", "model-name"); + } + + @Override + public Set<String> optionalOptions() { + return Set.of(); + } + + @Override + public AiModelClient createClient(ModelContext context) { + String endpoint = context.getOptions().get("endpoint"); + String apiKey = context.getOptions().get("api-key"); + String modelName = context.getOptions().get("model-name"); + return new OpenAiCompatibleModelClient(endpoint, apiKey, modelName); + } Review Comment: The OpenAI factory hardcodes `requiredOptions()` and `optionalOptions()` as bare string constants ("endpoint", "api-key", "model-name"). The same string keys are also used inline in `createClient()` and would need to be kept in lock-step manually. Extract these as `static final String` constants (or `ConfigOption`s, consistent with how the rest of the codebase declares connector options) to avoid drift between the option set and the keys actually read. ########## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java: ########## @@ -428,24 +427,45 @@ private List<ModelDef> parseModels(JsonNode modelsNode) { } else { modelDefs.add(convertJsonNodeToModelDef(modelsNode)); } + Set<String> seenNames = new HashSet<>(); + for (ModelDef model : modelDefs) { + if (!seenNames.add(model.getName())) { + throw new IllegalArgumentException( + "Duplicate model name '" + model.getName() + "' in pipeline definition."); + } + } return modelDefs; } private ModelDef convertJsonNodeToModelDef(JsonNode modelNode) { + Preconditions.checkArgument( + modelNode instanceof ObjectNode, + "`model` in `pipeline` should be an object, but got %s", + modelNode); + ObjectNode node = (ObjectNode) modelNode; String name = checkNotNull( - modelNode.get(MODEL_NAME_KEY), + node.remove(MODEL_NAME_KEY), "Missing required field \"%s\" in `model`", MODEL_NAME_KEY) .asText(); - String model = + Preconditions.checkArgument( + name.matches("[a-zA-Z_][a-zA-Z0-9_]*") && !name.startsWith("__"), + "Model name \"%s\" is not a valid identifier. " + + "It must start with a letter or underscore, " + + "contain only letters, digits, or underscores, " + + "and must not start with double underscores.", + name); Review Comment: The model name regex `[a-zA-Z_][a-zA-Z0-9_]*` rejects names containing hyphens (e.g. `my-model`), which is a very common convention and was implicitly allowed by the previous `class-name` based scheme (only the model logical name was a free string). Combined with the rename to `name`/`type`, this further narrows the set of valid pipelines users can write. Either document this constraint prominently or make the rule less restrictive — the constraint exists only because the model name is later emitted as a Java identifier in JaninoCompiler, but you could escape/translate hyphens internally instead. ########## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java: ########## @@ -428,24 +427,45 @@ private List<ModelDef> parseModels(JsonNode modelsNode) { } else { modelDefs.add(convertJsonNodeToModelDef(modelsNode)); } + Set<String> seenNames = new HashSet<>(); + for (ModelDef model : modelDefs) { + if (!seenNames.add(model.getName())) { + throw new IllegalArgumentException( + "Duplicate model name '" + model.getName() + "' in pipeline definition."); + } + } return modelDefs; } private ModelDef convertJsonNodeToModelDef(JsonNode modelNode) { + Preconditions.checkArgument( + modelNode instanceof ObjectNode, + "`model` in `pipeline` should be an object, but got %s", + modelNode); + ObjectNode node = (ObjectNode) modelNode; String name = checkNotNull( - modelNode.get(MODEL_NAME_KEY), + node.remove(MODEL_NAME_KEY), "Missing required field \"%s\" in `model`", MODEL_NAME_KEY) .asText(); - String model = + Preconditions.checkArgument( + name.matches("[a-zA-Z_][a-zA-Z0-9_]*") && !name.startsWith("__"), + "Model name \"%s\" is not a valid identifier. " + + "It must start with a letter or underscore, " + + "contain only letters, digits, or underscores, " + + "and must not start with double underscores.", + name); + String type = checkNotNull( - modelNode.get(MODEL_CLASS_NAME_KEY), + node.remove(MODEL_TYPE_KEY), "Missing required field \"%s\" in `model`", - MODEL_CLASS_NAME_KEY) + MODEL_TYPE_KEY) .asText(); - Map<String, String> properties = mapper.convertValue(modelNode, Map.class); - return new ModelDef(name, model, properties); + Map<String, String> options = new HashMap<>(); + node.fields() + .forEachRemaining(entry -> options.put(entry.getKey(), entry.getValue().asText())); + return new ModelDef(name, type, options); } Review Comment: YAML values such as `temperature: 0.7` are converted to strings via `entry.getValue().asText()`. For booleans/numbers this works, but for nested objects or arrays (e.g. `headers: { Authorization: ... }` or `stop: ["\n", "###"]`), `asText()` returns an empty string, silently dropping configuration. If providers may want structured options, use `entry.getValue().toString()` or a proper JSON-to-string conversion; otherwise validate up front that all option values are scalar. ########## flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/main/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClient.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.models.openai; + +import org.apache.flink.cdc.common.model.AiModelClient; +import org.apache.flink.cdc.common.model.abilities.SupportsEmbedding; +import org.apache.flink.cdc.common.model.abilities.SupportsTextGeneration; + +import com.openai.client.OpenAIClient; +import com.openai.client.okhttp.OpenAIOkHttpClient; +import com.openai.models.chat.completions.ChatCompletion; +import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.models.embeddings.CreateEmbeddingResponse; +import com.openai.models.embeddings.Embedding; +import com.openai.models.embeddings.EmbeddingCreateParams; + +import java.util.List; + +/** AI model client that connects to any OpenAI-compatible endpoint. */ +public class OpenAiCompatibleModelClient + implements AiModelClient, SupportsTextGeneration, SupportsEmbedding { + + private static final long serialVersionUID = 1L; + + private final String endpoint; + private final String apiKey; + private final String modelName; + + private transient OpenAIClient client; + + public OpenAiCompatibleModelClient(String endpoint, String apiKey, String modelName) { + this.endpoint = endpoint; + this.apiKey = apiKey; + this.modelName = modelName; + } + + @Override + public void open() { + client = OpenAIOkHttpClient.builder().baseUrl(endpoint).apiKey(apiKey).build(); + } + + @Override + public void close() { + client = null; Review Comment: `close()` simply nulls out the `client` reference without closing the underlying `OpenAIClient` / OkHttp connection pool. This will leak HTTP connections, threads, and sockets each time a pipeline operator is closed (or restarted from a checkpoint). The OpenAI Java SDK's `OpenAIClient` is `AutoCloseable` — call `client.close()` first, then null the reference. ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/model/AiModelClientFactory.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.model; + +import org.apache.flink.cdc.common.annotation.Experimental; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * SPI interface for AI model client factories. Each provider (e.g. OpenAI-compatible, DashScope) + * ships one implementation, discoverable via {@link java.util.ServiceLoader}. + * + * <p>The {@link #identifier()} value maps to the {@code type} field of a {@code pipeline.model} + * entry in the pipeline YAML. + */ +@Experimental +public interface AiModelClientFactory { + + /** A unique, lower-case identifier for this provider, e.g. {@code "openai-compatible"}. */ + String identifier(); + + /** Option keys that must be present in the model YAML options block. */ + Set<String> requiredOptions(); + + /** Option keys that may optionally appear in the model YAML options block. */ + Set<String> optionalOptions(); + + /** + * Validates that the given context contains all required options and no unknown options. + * Subclasses may override this to add custom validation logic. + */ + default void validate(ModelContext context) { + Set<String> required = requiredOptions(); + Set<String> optional = optionalOptions(); + if (required != null) { + Set<String> missing = + required.stream() + .filter(k -> !context.getOptions().containsKey(k)) + .collect(Collectors.toSet()); + if (!missing.isEmpty()) { + throw new IllegalArgumentException( + "Missing required options for model '" + + context.getModelName() + + "' (type='" + + identifier() + + "'): " + + missing); + } + } + if (required != null && optional != null) { + Set<String> unknown = + context.getOptions().keySet().stream() + .filter(k -> !required.contains(k) && !optional.contains(k)) + .collect(Collectors.toSet()); + if (!unknown.isEmpty()) { + throw new IllegalArgumentException( + "Unknown options for model '" + + context.getModelName() + + "' (type='" + + identifier() + + "'): " + + unknown); + } + } + } Review Comment: `requiredOptions()` and `optionalOptions()` are documented to never be null in subclass implementations, but `validate()` defensively allows them to be null (`if (required != null)` / `if (required != null && optional != null)`). The latter combined check is also incorrect: if `optional == null` but `required != null`, the unknown-options check is skipped entirely, silently accepting any user-provided keys. Either drop the null-tolerance and document the non-null contract, or treat null as an empty set so that validation still runs for the other side. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/ai/AiTextFunctionDef.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.ai; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; + +/** + * Built-in AI text generation function definitions with their prompt templates and type metadata. + */ +public enum AiTextFunctionDef { + AI_COMPLETE( + "AI_COMPLETE", + RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {"systemPrompt"}), + RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {"result"}), + "%s\n"), + + AI_SUMMARIZE( + "AI_SUMMARIZE", + RowType.of(new DataType[] {DataTypes.INT()}, new String[] {"maxLength"}), + RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {"summary"}), + "You are a text summarization expert. Generate an accurate, coherent, and informative " + + "summary that does not exceed %d characters.\n" + + "Output requirements:\n" + + "- summary: the summarized content\n" + + "Principles:\n" + + "- Stay within the specified length\n" + + "- Preserve core ideas and key information\n" + + "- Use concise language with clear logic\n" + + "- Maintain text coherence\n" + + "- Avoid subjective opinions\n"); + + private final String functionName; + private final RowType inputType; + private final RowType outputType; + private final String promptTemplate; + + AiTextFunctionDef( + String functionName, RowType inputType, RowType outputType, String promptTemplate) { + this.functionName = functionName; + this.inputType = inputType; + this.outputType = outputType; + this.promptTemplate = promptTemplate; + } + + public String getFunctionName() { + return functionName; + } + + /** + * Returns the additional parameter types for promptTemplate placeholders. + * + * <p>Input text parameter is always added by runtime, not included here. + */ + public RowType getInputType() { + return inputType; + } + + public RowType getOutputType() { + return outputType; + } + + /** Builds the core system prompt by filling in the template placeholders. */ + public String buildPrompt(Object... args) { + return String.format(promptTemplate, args); + } Review Comment: The system prompt is passed to `String.format(promptTemplate, args)` via `AiTextFunctionDef.buildPrompt`. For `AI_COMPLETE` the template is `"%s\n"` and the user-provided system prompt is the format argument; for `AI_SUMMARIZE` the schema hint also percolates into formatted strings later. If the system prompt (which is typically free-form user-supplied text) contains a literal `%` character, this will throw `java.util.MissingFormatArgumentException` / `IllegalFormatConversionException` at runtime, breaking the pipeline. Consider escaping `%` or using a safer substitution mechanism (e.g. `String.replace` with a placeholder). ########## flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/main/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClient.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.models.openai; + +import org.apache.flink.cdc.common.model.AiModelClient; +import org.apache.flink.cdc.common.model.abilities.SupportsEmbedding; +import org.apache.flink.cdc.common.model.abilities.SupportsTextGeneration; + +import com.openai.client.OpenAIClient; +import com.openai.client.okhttp.OpenAIOkHttpClient; +import com.openai.models.chat.completions.ChatCompletion; +import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.models.embeddings.CreateEmbeddingResponse; +import com.openai.models.embeddings.Embedding; +import com.openai.models.embeddings.EmbeddingCreateParams; + +import java.util.List; + +/** AI model client that connects to any OpenAI-compatible endpoint. */ +public class OpenAiCompatibleModelClient + implements AiModelClient, SupportsTextGeneration, SupportsEmbedding { + + private static final long serialVersionUID = 1L; + + private final String endpoint; + private final String apiKey; + private final String modelName; + + private transient OpenAIClient client; + + public OpenAiCompatibleModelClient(String endpoint, String apiKey, String modelName) { + this.endpoint = endpoint; + this.apiKey = apiKey; + this.modelName = modelName; + } + + @Override + public void open() { + client = OpenAIOkHttpClient.builder().baseUrl(endpoint).apiKey(apiKey).build(); + } + + @Override + public void close() { + client = null; + } + + @Override + public String generate(String systemPrompt, String userInput) { + ChatCompletionCreateParams params = + ChatCompletionCreateParams.builder() + .model(modelName) + .addSystemMessage(systemPrompt) + .addUserMessage(userInput) + .build(); + ChatCompletion completion = client.chat().completions().create(params); + return completion.choices().get(0).message().content().orElse(null); + } + + @Override + public float[] embed(String text) { + EmbeddingCreateParams params = + EmbeddingCreateParams.builder().model(modelName).input(text).build(); + CreateEmbeddingResponse response = client.embeddings().create(params); + List<Embedding> data = response.data(); + if (data.isEmpty()) { + return new float[0]; Review Comment: `embed()` returns an empty `float[0]` when the OpenAI response data list is empty. Callers (`AiFunctions.aiEmbed`) wrap this with `Floats.asList`, producing an empty list that is then materialized into the output column without any indication that the embedding actually failed/was missing. A silent empty vector is hard to distinguish from a legitimate zero-length result and will pollute downstream consumers (e.g. vector DBs). Consider throwing or logging a warning when the response contains no embeddings. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java: ########## @@ -526,23 +528,44 @@ private static Java.Rvalue generateOtherFunctionOperation( context.udfDescriptors.stream() .filter(e -> e.getName().equalsIgnoreCase(operationName)) .findFirst(); - return udfFunctionOptional - .map( - udfFunction -> - new Java.MethodInvocation( - Location.NOWHERE, - null, - generateInvokeExpression(udfFunction), - atoms)) - .orElseGet( - () -> - new Java.MethodInvocation( - Location.NOWHERE, - null, - StringUtils.convertToCamelCase( - sqlBasicCall.getOperator().getName()), - atoms)); + if (udfFunctionOptional.isPresent()) { + return new Java.MethodInvocation( + Location.NOWHERE, + null, + generateInvokeExpression(udfFunctionOptional.get()), + atoms); + } + if (isAiFunction(operationName) && atoms.length >= 1) { + rewriteAiFunctionModelArg(atoms); + } + return new Java.MethodInvocation( + Location.NOWHERE, + null, + StringUtils.convertToCamelCase(sqlBasicCall.getOperator().getName()), + atoms); + } + } + + private static boolean isAiFunction(String upperCaseName) { + for (AiTextFunctionDef def : AiTextFunctionDef.values()) { + if (def.getFunctionName().equals(upperCaseName)) { + return true; + } + } + for (AiEmbeddingFunctionDef def : AiEmbeddingFunctionDef.values()) { + if (def.getFunctionName().equals(upperCaseName)) { + return true; + } + } + return false; + } + + private static void rewriteAiFunctionModelArg(Java.Rvalue[] atoms) { + String modelName = atoms[0].toString(); + if (modelName.startsWith("\"") && modelName.endsWith("\"")) { + modelName = modelName.substring(1, modelName.length() - 1); } + atoms[0] = new Java.AmbiguousName(Location.NOWHERE, new String[] {modelName}); } Review Comment: `rewriteAiFunctionModelArg` blindly calls `atoms[0].toString()` and assumes the result is either a quoted string literal `"name"` or an identifier. If the user passes anything else as the first argument to an AI function (e.g. a column reference, a function call, a non-string literal), this will produce a malformed Java identifier such as a number or expression text and the generated Janino code will fail to compile with a confusing error. Validate that the first argument is actually a string literal at SQL parse time, or emit a clear error here when stripping quotes does not yield a valid identifier. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/AiFunctionSqlOperatorTable.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.parser.metadata; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.runtime.ai.AiEmbeddingFunctionDef; +import org.apache.flink.cdc.runtime.ai.AiTextFunctionDef; +import org.apache.flink.cdc.runtime.typeutils.CalciteDataTypeConverter; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.util.SqlOperatorTables; + +import java.util.ArrayList; +import java.util.List; + +/** Creates SqlOperatorTable from {@link AiTextFunctionDef} definitions. */ +public class AiFunctionSqlOperatorTable { + + private AiFunctionSqlOperatorTable() {} + + /** Creates an SqlOperatorTable containing all AI functions defined in AiFunctionDef. */ + public static org.apache.calcite.sql.SqlOperatorTable create() { + List<SqlFunction> functions = new ArrayList<>(); + for (AiTextFunctionDef def : AiTextFunctionDef.values()) { + functions.add(createTextSqlFunction(def)); + } + for (AiEmbeddingFunctionDef def : AiEmbeddingFunctionDef.values()) { + functions.add(createEmbeddingSqlFunction(def)); + } + return SqlOperatorTables.of(functions); + } + + private static SqlFunction createTextSqlFunction(AiTextFunctionDef def) { + return new SqlFunction( + def.getFunctionName(), + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.VARIANT), + null, + OperandTypes.family(toSqlTypeFamiliesWithAdditionalParams(def.getInputType())), + SqlFunctionCategory.USER_DEFINED_FUNCTION); + } + + private static SqlFunction createEmbeddingSqlFunction(AiEmbeddingFunctionDef def) { + return new SqlFunction( + def.getFunctionName(), + SqlKind.OTHER_FUNCTION, + opBinding -> + CalciteDataTypeConverter.convertCalciteType( + opBinding.getTypeFactory(), def.getOutputType()), + null, + OperandTypes.family(SqlTypeFamily.STRING, toSqlTypeFamily(def.getInputType())), + SqlFunctionCategory.USER_DEFINED_FUNCTION); + } + + /** + * Converts inputType to SqlTypeFamily array, prepending additional parameters: modelName + * (STRING) and input (STRING). + */ + private static SqlTypeFamily[] toSqlTypeFamiliesWithAdditionalParams(RowType inputType) { + List<SqlTypeFamily> families = new ArrayList<>(); + families.add(SqlTypeFamily.STRING); // modelName + families.add(SqlTypeFamily.STRING); // input + for (DataType fieldType : inputType.getFieldTypes()) { + families.add(toSqlTypeFamily(fieldType)); + } + return families.toArray(new SqlTypeFamily[0]); + } + + private static SqlTypeFamily toSqlTypeFamily(DataType dataType) { + switch (dataType.getTypeRoot()) { + case VARCHAR: + case CHAR: + return SqlTypeFamily.STRING; + case INTEGER: + return SqlTypeFamily.INTEGER; + case BIGINT: + return SqlTypeFamily.NUMERIC; + case FLOAT: + case DOUBLE: + return SqlTypeFamily.APPROXIMATE_NUMERIC; + case BOOLEAN: + return SqlTypeFamily.BOOLEAN; + default: + return SqlTypeFamily.ANY; + } Review Comment: `toSqlTypeFamily` maps `INTEGER` to `SqlTypeFamily.INTEGER` but `BIGINT` to `SqlTypeFamily.NUMERIC`. This is asymmetric: `NUMERIC` is a much broader family that admits decimals/floats and would let users pass a DOUBLE column where a BIGINT is expected. Map BIGINT to `SqlTypeFamily.EXACT_NUMERIC` (or to `INTEGER` if you accept the narrowing implications consistent with the existing INTEGER case) for consistency. ########## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java: ########## @@ -96,9 +97,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private static final String UDF_OPTIONS_KEY = "options"; // Model related keys - private static final String MODEL_NAME_KEY = "model-name"; - - private static final String MODEL_CLASS_NAME_KEY = "class-name"; + private static final String MODEL_NAME_KEY = "name"; + private static final String MODEL_TYPE_KEY = "type"; Review Comment: Renaming `model-name`/`class-name` to `name`/`type` is a breaking change to the public pipeline YAML schema. The PR removes the previous `class-name`-based configuration entirely without a deprecation path or migration shim, so any user upgrading with an existing `pipeline.model` block will fail to parse. Please add a release note flagging this incompatibility, and consider supporting the legacy keys with a deprecation warning for at least one release. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java: ########## @@ -558,4 +571,28 @@ private void destroyUdf() { udfDescriptors.clear(); udfFunctionInstances.clear(); } + + private void initializeAiModelClients() { + for (Map.Entry<String, AiModelClient> entry : modelClients.entrySet()) { + try { + entry.getValue().open(); + LOG.info("Successfully opened AI model client '{}'.", entry.getKey()); + } catch (Exception e) { + LOG.error("Failed to open AI model client '{}'.", entry.getKey(), e); + throw new FlinkRuntimeException( + "Failed to initialize AI model: " + entry.getKey(), e); + } + } + } + + private void destroyAiModelClients() { + for (Map.Entry<String, AiModelClient> entry : modelClients.entrySet()) { + try { + entry.getValue().close(); + LOG.info("Successfully closed AI model client '{}'.", entry.getKey()); + } catch (Exception e) { + LOG.warn("Failed to close AI model client '{}'.", entry.getKey(), e); + } + } + } Review Comment: `destroyAiModelClients` swallows any exception thrown by `client.close()` with only a `LOG.warn`. While not failing close is generally desirable in a teardown path, downstream operational issues (e.g. blocked HTTP threads not released) will be invisible to users that only check job status. Consider including a stack trace placeholder hint, or aggregating exceptions and rethrowing after iterating all entries so a single bad client does not silence everything. Also worth noting that `initializeAiModelClients` runs before `initializeUdf` — if a model `open()` fails, UDFs will never be initialized but the operator's `udfDescriptors`/`udfFunctionInstances` collections may be referenced by other code paths during teardown. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/AiFunctions.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.functions.impl; + +import org.apache.flink.cdc.common.model.AiModelClient; +import org.apache.flink.cdc.common.model.abilities.SupportsEmbedding; +import org.apache.flink.cdc.common.model.abilities.SupportsTextGeneration; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.common.types.variant.BinaryVariant; +import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder; +import org.apache.flink.cdc.runtime.ai.AiTextFunctionDef; + +import org.apache.flink.shaded.guava31.com.google.common.primitives.Floats; + +import java.util.List; + +/** Built-in AI functions available as static imports in Janino-compiled transform expressions. */ +public class AiFunctions { Review Comment: `AiFunctions` has no public constructor declared, so a default public one is generated. Since this class is purely a static utility (matching the pattern of other `*Functions` helper classes), make the constructor private to prevent instantiation, mirroring the style used by `AiFunctionSqlOperatorTable` (which already has `private AiFunctionSqlOperatorTable()`). ########## flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/pom.xml: ########## @@ -0,0 +1,88 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-pipeline-model</artifactId> + <version>${revision}</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-cdc-pipeline-model-openai-compatible</artifactId> + <packaging>jar</packaging> + + <properties> + <jackson.version>2.13.4</jackson.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.openai</groupId> + <artifactId>openai-java</artifactId> + <version>4.32.0</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <configuration> + <artifactSet> + <includes combine.children="append"> + <include>*:*</include> + </includes> + </artifactSet> + <filters combine.children="append"> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/services/com.fasterxml.**</exclude> + <exclude>META-INF/services/kotlin.**</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>com.fasterxml</pattern> + <shadedPattern>org.apache.flink.cdc.models.openai.shaded.com.fasterxml</shadedPattern> + </relocation> + <relocation> + <pattern>okhttp3</pattern> + <shadedPattern>org.apache.flink.cdc.models.openai.shaded.okhttp3</shadedPattern> + </relocation> + <relocation> + <pattern>okio</pattern> + <shadedPattern>org.apache.flink.cdc.models.openai.shaded.okio</shadedPattern> + </relocation> Review Comment: The shaded `openai-compatible` module relocates `com.fasterxml`, `okhttp3`, and `okio`, but does not relocate `kotlin.*`. The OpenAI Java SDK depends on Kotlin stdlib; if a different Kotlin version is loaded by Flink or another connector at runtime, this will conflict. Either add a `kotlin` relocation here or confirm that the OpenAI SDK does not bring kotlin onto the classpath in the shaded jar. ########## flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/test/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClientITCase.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.models.openai; + +import org.assertj.core.api.Assumptions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class OpenAiCompatibleModelClientITCase { + + private OpenAiCompatibleModelClient client; + + @BeforeEach + void setUp() { + String endpoint = System.getenv("OPENAI_BASE_URL"); + String apiKey = System.getenv("OPENAI_API_KEY"); + String model = System.getenv("OPENAI_MODEL"); + Assumptions.assumeThat(endpoint != null && apiKey != null && model != null) + .as("OPENAI_BASE_URL, OPENAI_API_KEY and OPENAI_MODEL must be set") + .isTrue(); + + client = new OpenAiCompatibleModelClient(endpoint, apiKey, model); + client.open(); + } Review Comment: `Assumptions.assumeThat(...).isTrue()` is being misused: `org.assertj.core.api.Assumptions` should be invoked with a value to assume, but here the `as(...)` description message is misleading — when `endpoint`, `apiKey` or `model` is null, the `assumeThat` call still receives the boolean result of the AND expression, so AssertJ skips the test. That part works, but the `.as("OPENAI_BASE_URL, OPENAI_API_KEY and OPENAI_MODEL must be set")` description is only shown when the assumption fails (i.e. test is skipped), which the developer probably intends. However the same pattern is duplicated in the e2e test — consider extracting a small helper, and double-check that JUnit Jupiter actually reports the test as skipped (not passed) on missing env vars in your CI to avoid silently green results. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java: ########## @@ -526,23 +528,44 @@ private static Java.Rvalue generateOtherFunctionOperation( context.udfDescriptors.stream() .filter(e -> e.getName().equalsIgnoreCase(operationName)) .findFirst(); - return udfFunctionOptional - .map( - udfFunction -> - new Java.MethodInvocation( - Location.NOWHERE, - null, - generateInvokeExpression(udfFunction), - atoms)) - .orElseGet( - () -> - new Java.MethodInvocation( - Location.NOWHERE, - null, - StringUtils.convertToCamelCase( - sqlBasicCall.getOperator().getName()), - atoms)); + if (udfFunctionOptional.isPresent()) { + return new Java.MethodInvocation( + Location.NOWHERE, + null, + generateInvokeExpression(udfFunctionOptional.get()), + atoms); + } + if (isAiFunction(operationName) && atoms.length >= 1) { + rewriteAiFunctionModelArg(atoms); + } + return new Java.MethodInvocation( + Location.NOWHERE, + null, + StringUtils.convertToCamelCase(sqlBasicCall.getOperator().getName()), + atoms); + } + } + + private static boolean isAiFunction(String upperCaseName) { + for (AiTextFunctionDef def : AiTextFunctionDef.values()) { + if (def.getFunctionName().equals(upperCaseName)) { + return true; + } + } + for (AiEmbeddingFunctionDef def : AiEmbeddingFunctionDef.values()) { + if (def.getFunctionName().equals(upperCaseName)) { + return true; + } + } + return false; + } + + private static void rewriteAiFunctionModelArg(Java.Rvalue[] atoms) { + String modelName = atoms[0].toString(); + if (modelName.startsWith("\"") && modelName.endsWith("\"")) { + modelName = modelName.substring(1, modelName.length() - 1); } + atoms[0] = new Java.AmbiguousName(Location.NOWHERE, new String[] {modelName}); } Review Comment: There are no unit tests for `JaninoCompiler.rewriteAiFunctionModelArg` itself nor for `AiFunctionSqlOperatorTable`. The new code paths in `JaninoCompiler` (`isAiFunction`, `rewriteAiFunctionModelArg`) are exercised only indirectly via `AiFunctionParserTest`. Given the fragility of the model-name extraction logic (raw `toString()` parse), please add direct tests covering edge cases such as: model name not in quotes, model name being a non-string-literal expression, model name containing characters disallowed by the YAML model-name regex, and case mismatch between `pipeline.model.name` and the SQL literal. ########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/AiFunctionE2eITCase.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.assertj.core.api.Assumptions; +import org.junit.jupiter.api.Test; + +import java.nio.file.Path; +import java.time.Duration; + +/** E2e tests for AI functions with the dummy model and openai-compatible model. */ +class AiFunctionE2eITCase extends PipelineTestEnvironment { + + @Test + void testAiFunctionsWithDummyModel() throws Exception { + String pipelineJob = + "source:\n" + + " type: values\n" + + " event-set.id: SINGLE_SPLIT_MULTI_TABLES\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "transform:\n" + + " - source-table: default_namespace.default_schema.table1\n" + + " projection: col1, AI_COMPLETE('myModel', col1, 'Classify into catA or catB') AS cls\n" + + " - source-table: default_namespace.default_schema.table2\n" + + " projection: col1, AI_EMBED('myModel', col1) AS embedding\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1\n" + + " schema.change.behavior: evolve\n" + + " model:\n" + + " name: myModel\n" + + " type: dummy\n" + + " debug: true\n"; + + Path dummyModelJar = TestUtils.getResource("dummy-model.jar"); + submitPipelineJob(pipelineJob, dummyModelJar); + waitUntilJobFinished(Duration.ofMinutes(3)); + + validateResult( + "Successfully opened AI model client 'myModel'.", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`cls` VARIANT}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], after=[2, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING NOT NULL,`embedding` ARRAY<FLOAT>}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, [3.0, 1.0, 4.0, 1.0, 5.0, 9.0, 2.0, 6.0]], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, [3.0, 1.0, 4.0, 1.0, 5.0, 9.0, 2.0, 6.0]], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, [3.0, 1.0, 4.0, 1.0, 5.0, 9.0, 2.0, 6.0]], op=INSERT, meta=()}", + "Successfully closed AI model client 'myModel'."); Review Comment: `testAiFunctionsWithDummyModel` asserts on log messages such as `"Successfully opened AI model client 'myModel'."` via `validateResult`. These are info-level log strings emitted by the operator (not pipeline output records), and are easily broken by any cosmetic logging change. Tying e2e test correctness to log text is fragile; prefer asserting on emitted records and decouple lifecycle verification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
