This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a19fdfe0f [FLINK-37586][udf] Add support for options in user-defined 
functions and update related documentation (#4252)
a19fdfe0f is described below

commit a19fdfe0f5efd48514ceebc6135b92ed74ebd152
Author: Jia Fan <[email protected]>
AuthorDate: Thu Mar 5 19:50:12 2026 +0800

    [FLINK-37586][udf] Add support for options in user-defined functions and 
update related documentation (#4252)
---
 docs/content.zh/docs/core-concept/transform.md     | 45 +++++++++++++
 docs/content/docs/core-concept/transform.md        | 45 +++++++++++++
 .../cli/parser/YamlPipelineDefinitionParser.java   | 13 +++-
 .../parser/YamlPipelineDefinitionParserTest.java   | 47 ++++++++++++++
 .../pipeline-definition-with-udf-options.yaml      | 36 +++++++++++
 .../flink/cdc/composer/definition/UdfDef.java      | 30 ++++++++-
 .../flink/translator/TransformTranslator.java      |  3 +-
 .../cdc/composer/flink/FlinkPipelineUdfITCase.java | 73 ++++++++++++++++++++++
 .../src/test/resources/rules/unexpected.yaml       |  2 +-
 .../examples/java/ConfigurableFunctionClass.java   | 49 +++++++++++++++
 .../examples/scala/ConfigurableFunctionClass.scala | 44 +++++++++++++
 11 files changed, 379 insertions(+), 8 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index 5e1a00c9f..a9bae80cd 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -412,6 +412,51 @@ pipeline:
 
 注意这里的 `classpath` 必须是全限定名,并且对应的 `jar` 文件必须包含在 Flink `/lib` 文件夹中,或者通过 
`flink-cdc.sh --jar` 选项传递。
 
+### UDF 配置选项
+
+你可以通过添加 `options` 块来向 UDF 传递额外的配置选项。这些选项可以在 `open` 方法中通过 
`UserDefinedFunctionContext.configuration()` 获取:
+
+```yaml
+pipeline:
+  user-defined-function:
+    - name: query_redis
+      classpath: com.example.flink.cdc.udf.RedisQueryFunction
+      options:
+        hostname: localhost
+        port: "6379"
+        cache.enabled: "true"
+```
+
+在你的 UDF 实现中,可以通过定义 `ConfigOption` 实例来访问这些配置选项:
+
+```java
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+public class RedisQueryFunction implements UserDefinedFunction {
+    private static final ConfigOption<String> HOSTNAME =
+        ConfigOptions.key("hostname").stringType().noDefaultValue();
+    private static final ConfigOption<Integer> PORT =
+        ConfigOptions.key("port").intType().defaultValue(6379);
+
+    private String hostname;
+    private int port;
+    
+    @Override
+    public void open(UserDefinedFunctionContext context) throws Exception {
+        hostname = context.configuration().get(HOSTNAME);
+        port = context.configuration().get(PORT);
+        // 在这里初始化你的连接...
+    }
+    
+    public Object eval(String key) {
+        // 使用 hostname 和 port 查询 Redis...
+    }
+}
+```
+
+`options` 字段是可选的。如果未指定,将会传递一个空的配置给 UDF。
+
 在正确注册后,UDF 可以在 `projection` 和 `filter` 表达式中使用,就像内置函数一样:
 
 ```yaml
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index 22fe8aff1..aed162567 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -417,6 +417,51 @@ pipeline:
 
 Notice that given classpath must be fully-qualified, and corresponding `jar` 
files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh 
--jar` option.
 
+### UDF Options
+
+You can pass extra options to UDFs by adding an `options` block. These options 
will be available in the `open` method through 
`UserDefinedFunctionContext.configuration()`:
+
+```yaml
+pipeline:
+  user-defined-function:
+    - name: query_redis
+      classpath: com.example.flink.cdc.udf.RedisQueryFunction
+      options:
+        hostname: localhost
+        port: "6379"
+        cache.enabled: "true"
+```
+
+And in your UDF implementation, you can access these options by defining 
`ConfigOption` instances:
+
+```java
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+public class RedisQueryFunction implements UserDefinedFunction {
+    private static final ConfigOption<String> HOSTNAME =
+        ConfigOptions.key("hostname").stringType().noDefaultValue();
+    private static final ConfigOption<Integer> PORT =
+        ConfigOptions.key("port").intType().defaultValue(6379);
+
+    private String hostname;
+    private int port;
+    
+    @Override
+    public void open(UserDefinedFunctionContext context) throws Exception {
+        hostname = context.configuration().get(HOSTNAME);
+        port = context.configuration().get(PORT);
+        // Initialize your connection here...
+    }
+    
+    public Object eval(String key) {
+        // Query Redis using hostname and port...
+    }
+}
+```
+
+The `options` field is optional. If not specified, an empty configuration will 
be passed to the UDF.
+
 After being correctly registered, UDFs could be used in both `projection` and 
`filter` expressions, just like built-in functions:
 
 ```yaml
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index efb4e79d2..79886ea08 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -92,6 +92,7 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
     private static final String UDF_KEY = "user-defined-function";
     private static final String UDF_FUNCTION_NAME_KEY = "name";
     private static final String UDF_CLASSPATH_KEY = "classpath";
+    private static final String UDF_OPTIONS_KEY = "options";
 
     // Model related keys
     private static final String MODEL_NAME_KEY = "model-name";
@@ -295,7 +296,7 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
                 "UDF",
                 udfNode,
                 Arrays.asList(UDF_FUNCTION_NAME_KEY, UDF_CLASSPATH_KEY),
-                Collections.emptyList());
+                Collections.singletonList(UDF_OPTIONS_KEY));
 
         String functionName =
                 checkNotNull(
@@ -310,7 +311,15 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
                                 UDF_CLASSPATH_KEY)
                         .asText();
 
-        return new UdfDef(functionName, classpath);
+        Map<String, String> options =
+                Optional.ofNullable(udfNode.get(UDF_OPTIONS_KEY))
+                        .map(
+                                node ->
+                                        mapper.convertValue(
+                                                node, new 
TypeReference<Map<String, String>>() {}))
+                        .orElse(null);
+
+        return new UdfDef(functionName, classpath, options);
     }
 
     private TransformDef toTransformDef(JsonNode transformNode) {
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index bf4c377ee..bbfafcf8e 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -197,6 +197,15 @@ class YamlPipelineDefinitionParserTest {
         assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf);
     }
 
+    @Test
+    void testUdfDefinitionWithOptions() throws Exception {
+        URL resource =
+                
Resources.getResource("definitions/pipeline-definition-with-udf-options.yaml");
+        YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new 
Configuration());
+        assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
+    }
+
     @Test
     void testSchemaEvolutionTypesConfiguration() throws Exception {
         testSchemaEvolutionTypesParsing(
@@ -669,4 +678,42 @@ class YamlPipelineDefinitionParserTest {
                             ImmutableMap.<String, String>builder()
                                     .put("parallelism", "1")
                                     .build()));
+
+    private final PipelineDef pipelineDefWithUdfOptions =
+            new PipelineDef(
+                    new SourceDef("values", null, new Configuration()),
+                    new SinkDef(
+                            "values",
+                            null,
+                            new Configuration(),
+                            ImmutableSet.of(
+                                    DROP_COLUMN,
+                                    ALTER_COLUMN_TYPE,
+                                    ADD_COLUMN,
+                                    CREATE_TABLE,
+                                    RENAME_COLUMN)),
+                    Collections.emptyList(),
+                    Collections.singletonList(
+                            new TransformDef(
+                                    "mydb.web_order",
+                                    "*, query_redis(id) as redis_value",
+                                    "id > 0",
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null)),
+                    Collections.singletonList(
+                            new UdfDef(
+                                    "query_redis",
+                                    
"org.apache.flink.cdc.udf.examples.java.RedisQueryFunction",
+                                    ImmutableMap.<String, String>builder()
+                                            .put("hostname", "localhost")
+                                            .put("port", "6379")
+                                            .put("cache.enabled", "true")
+                                            .build())),
+                    Configuration.fromMap(
+                            ImmutableMap.<String, String>builder()
+                                    .put("parallelism", "1")
+                                    .build()));
 }
diff --git 
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml
 
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml
new file mode 100644
index 000000000..6a985cbf4
--- /dev/null
+++ 
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml
@@ -0,0 +1,36 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+source:
+  type: values
+
+sink:
+  type: values
+
+transform:
+  - source-table: mydb.web_order
+    projection: "*, query_redis(id) as redis_value"
+    filter: id > 0
+
+pipeline:
+  parallelism: 1
+  user-defined-function:
+    - name: query_redis
+      classpath: org.apache.flink.cdc.udf.examples.java.RedisQueryFunction
+      options:
+        hostname: localhost
+        port: "6379"
+        cache.enabled: "true"
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
index 6dbc580fb..0486619ec 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.cdc.composer.definition;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -27,15 +29,22 @@ import java.util.Objects;
  * <ul>
  *   <li>name: Static method name of user-defined functions.
  *   <li>classpath: Fully-qualified class path of package containing given 
function.
+ *   <li>options: Configuration options for the user-defined function.
  * </ul>
  */
 public class UdfDef {
     private final String name;
     private final String classpath;
+    private final Map<String, String> options;
 
     public UdfDef(String name, String classpath) {
+        this(name, classpath, Collections.emptyMap());
+    }
+
+    public UdfDef(String name, String classpath, Map<String, String> options) {
         this.name = name;
         this.classpath = classpath;
+        this.options = options != null ? options : Collections.emptyMap();
     }
 
     public String getName() {
@@ -46,6 +55,10 @@ public class UdfDef {
         return classpath;
     }
 
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -56,16 +69,27 @@ public class UdfDef {
         }
 
         UdfDef udfDef = (UdfDef) o;
-        return Objects.equals(name, udfDef.name) && Objects.equals(classpath, 
udfDef.classpath);
+        return Objects.equals(name, udfDef.name)
+                && Objects.equals(classpath, udfDef.classpath)
+                && Objects.equals(options, udfDef.options);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, classpath);
+        return Objects.hash(name, classpath, options);
     }
 
     @Override
     public String toString() {
-        return "UdfDef{" + "name='" + name + '\'' + ", classpath='" + 
classpath + '\'' + '}';
+        return "UdfDef{"
+                + "name='"
+                + name
+                + '\''
+                + ", classpath='"
+                + classpath
+                + '\''
+                + ", options="
+                + options
+                + '}';
     }
 }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
index 4cc7a0b24..a938d7ab0 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperatorBuil
 import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -133,6 +132,6 @@ public class TransformTranslator {
     }
 
     private Tuple3<String, String, Map<String, String>> 
udfDefToUDFTuple(UdfDef udf) {
-        return Tuple3.of(udf.getName(), udf.getClasspath(), new HashMap<>());
+        return Tuple3.of(udf.getName(), udf.getClasspath(), udf.getOptions());
     }
 }
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
index 5daf370af..9d5a8b22e 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
@@ -547,6 +547,79 @@ class FlinkPipelineUdfITCase {
                 .contains("[ LifecycleFunction ] closed. Called 6 times.");
     }
 
+    @ParameterizedTest
+    @MethodSource("testParams")
+    void testConfigurableUdf(ValuesDataSink.SinkApi sinkApi, String language) 
throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup transform
+        TransformDef transformDef =
+                new TransformDef(
+                        "default_namespace.default_schema.table1",
+                        "*, greet(col1) as greeting",
+                        null,
+                        "col1",
+                        null,
+                        "key1=value1",
+                        "",
+                        null);
+
+        // Setup UDF with options
+        UdfDef udfDef =
+                new UdfDef(
+                        "greet",
+                        String.format(
+                                
"org.apache.flink.cdc.udf.examples.%s.ConfigurableFunctionClass",
+                                language),
+                        ImmutableMap.of("greeting", "Hi", "suffix", "~"));
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(transformDef),
+                        Collections.singletonList(udfDef),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`greeting` STRING}, 
primaryKeys=col1, options=({key1=value1})}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, Hi 1~], op=INSERT, meta=({op_ts=1})}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, Hi 2~], op=INSERT, meta=({op_ts=2})}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, Hi 3~], op=INSERT, meta=({op_ts=3})}",
+                        
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, 
existedColumnName=col2}]}",
+                        
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, 
nameMapping={col2=newCol2, col3=newCol3}}",
+                        
"DropColumnEvent{tableId=default_namespace.default_schema.table1, 
droppedColumnNames=[newCol2]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 
Hi 1~], after=[], op=DELETE, meta=({op_ts=4})}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 
Hi 2~], after=[2, x, Hi 2~], op=UPDATE, meta=({op_ts=5})}");
+    }
+
     // --------------------------
     // Flink-compatible UDF tests
     // --------------------------
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
index 293f76cd4..ff7339a4c 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
@@ -79,5 +79,5 @@ steps:
             language: clojure
     error: |
       Unexpected key `language` in YAML UDF block.
-      Allowed keys in this context are: [name, classpath]
+      Allowed keys in this context are: [name, classpath, options]
       Note: option language: "clojure" is unexpected. It was silently ignored 
in previous versions, and probably should be removed.
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java
new file mode 100644
index 000000000..1ce7309b7
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
+
+/** This is an example UDF class that reads options from configuration. */
+public class ConfigurableFunctionClass implements UserDefinedFunction {
+
+    private static final ConfigOption<String> GREETING =
+            ConfigOptions.key("greeting").stringType().defaultValue("Hello");
+
+    private static final ConfigOption<String> SUFFIX =
+            ConfigOptions.key("suffix").stringType().defaultValue("!");
+
+    private String greeting;
+    private String suffix;
+
+    public String eval(String value) {
+        return greeting + " " + value + suffix;
+    }
+
+    @Override
+    public void open(UserDefinedFunctionContext context) throws Exception {
+        greeting = context.configuration().get(GREETING);
+        suffix = context.configuration().get(SUFFIX);
+    }
+
+    @Override
+    public void close() throws Exception {}
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala
new file mode 100644
index 000000000..6405ef715
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala
+
+import org.apache.flink.cdc.common.configuration.ConfigOptions
+import org.apache.flink.cdc.common.udf.{UserDefinedFunction, 
UserDefinedFunctionContext}
+
+/** This is an example UDF class that reads options from configuration. */
+class ConfigurableFunctionClass extends UserDefinedFunction {
+
+  private var greeting: String = "Hello"
+  private var suffix: String = "!"
+
+  def eval(value: String): String = {
+    greeting + " " + value + suffix
+  }
+
+  override def open(context: UserDefinedFunctionContext): Unit = {
+    greeting = context.configuration().get(ConfigurableFunctionClass.GREETING)
+    suffix = context.configuration().get(ConfigurableFunctionClass.SUFFIX)
+  }
+
+  override def close(): Unit = {}
+}
+
+object ConfigurableFunctionClass {
+  private val GREETING = 
ConfigOptions.key("greeting").stringType().defaultValue("Hello")
+  private val SUFFIX = 
ConfigOptions.key("suffix").stringType().defaultValue("!")
+}

Reply via email to