This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a19fdfe0f [FLINK-37586][udf] Add support for options in user-defined
functions and update related documentation (#4252)
a19fdfe0f is described below
commit a19fdfe0f5efd48514ceebc6135b92ed74ebd152
Author: Jia Fan <[email protected]>
AuthorDate: Thu Mar 5 19:50:12 2026 +0800
[FLINK-37586][udf] Add support for options in user-defined functions and
update related documentation (#4252)
---
docs/content.zh/docs/core-concept/transform.md | 45 +++++++++++++
docs/content/docs/core-concept/transform.md | 45 +++++++++++++
.../cli/parser/YamlPipelineDefinitionParser.java | 13 +++-
.../parser/YamlPipelineDefinitionParserTest.java | 47 ++++++++++++++
.../pipeline-definition-with-udf-options.yaml | 36 +++++++++++
.../flink/cdc/composer/definition/UdfDef.java | 30 ++++++++-
.../flink/translator/TransformTranslator.java | 3 +-
.../cdc/composer/flink/FlinkPipelineUdfITCase.java | 73 ++++++++++++++++++++++
.../src/test/resources/rules/unexpected.yaml | 2 +-
.../examples/java/ConfigurableFunctionClass.java | 49 +++++++++++++++
.../examples/scala/ConfigurableFunctionClass.scala | 44 +++++++++++++
11 files changed, 379 insertions(+), 8 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md
b/docs/content.zh/docs/core-concept/transform.md
index 5e1a00c9f..a9bae80cd 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -412,6 +412,51 @@ pipeline:
注意这里的 `classpath` 必须是全限定名,并且对应的 `jar` 文件必须包含在 Flink `/lib` 文件夹中,或者通过
`flink-cdc.sh --jar` 选项传递。
+### UDF 配置选项
+
+你可以通过添加 `options` 块来向 UDF 传递额外的配置选项。这些选项可以在 `open` 方法中通过
`UserDefinedFunctionContext.configuration()` 获取:
+
+```yaml
+pipeline:
+ user-defined-function:
+ - name: query_redis
+ classpath: com.example.flink.cdc.udf.RedisQueryFunction
+ options:
+ hostname: localhost
+ port: "6379"
+ cache.enabled: "true"
+```
+
+在你的 UDF 实现中,可以通过定义 `ConfigOption` 实例来访问这些配置选项:
+
+```java
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+public class RedisQueryFunction implements UserDefinedFunction {
+ private static final ConfigOption<String> HOSTNAME =
+ ConfigOptions.key("hostname").stringType().noDefaultValue();
+ private static final ConfigOption<Integer> PORT =
+ ConfigOptions.key("port").intType().defaultValue(6379);
+
+ private String hostname;
+ private int port;
+
+ @Override
+ public void open(UserDefinedFunctionContext context) throws Exception {
+ hostname = context.configuration().get(HOSTNAME);
+ port = context.configuration().get(PORT);
+ // 在这里初始化你的连接...
+ }
+
+ public Object eval(String key) {
+ // 使用 hostname 和 port 查询 Redis...
+ }
+}
+```
+
+`options` 字段是可选的。如果未指定,将会传递一个空的配置给 UDF。
+
在正确注册后,UDF 可以在 `projection` 和 `filter` 表达式中使用,就像内置函数一样:
```yaml
diff --git a/docs/content/docs/core-concept/transform.md
b/docs/content/docs/core-concept/transform.md
index 22fe8aff1..aed162567 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -417,6 +417,51 @@ pipeline:
Notice that given classpath must be fully-qualified, and corresponding `jar`
files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh
--jar` option.
+### UDF Options
+
+You can pass extra options to UDFs by adding an `options` block. These options
will be available in the `open` method through
`UserDefinedFunctionContext.configuration()`:
+
+```yaml
+pipeline:
+ user-defined-function:
+ - name: query_redis
+ classpath: com.example.flink.cdc.udf.RedisQueryFunction
+ options:
+ hostname: localhost
+ port: "6379"
+ cache.enabled: "true"
+```
+
+And in your UDF implementation, you can access these options by defining
`ConfigOption` instances:
+
+```java
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+public class RedisQueryFunction implements UserDefinedFunction {
+ private static final ConfigOption<String> HOSTNAME =
+ ConfigOptions.key("hostname").stringType().noDefaultValue();
+ private static final ConfigOption<Integer> PORT =
+ ConfigOptions.key("port").intType().defaultValue(6379);
+
+ private String hostname;
+ private int port;
+
+ @Override
+ public void open(UserDefinedFunctionContext context) throws Exception {
+ hostname = context.configuration().get(HOSTNAME);
+ port = context.configuration().get(PORT);
+ // Initialize your connection here...
+ }
+
+ public Object eval(String key) {
+ // Query Redis using hostname and port...
+ }
+}
+```
+
+The `options` field is optional. If not specified, an empty configuration will
be passed to the UDF.
+
After being correctly registered, UDFs could be used in both `projection` and
`filter` expressions, just like built-in functions:
```yaml
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index efb4e79d2..79886ea08 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -92,6 +92,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
private static final String UDF_KEY = "user-defined-function";
private static final String UDF_FUNCTION_NAME_KEY = "name";
private static final String UDF_CLASSPATH_KEY = "classpath";
+ private static final String UDF_OPTIONS_KEY = "options";
// Model related keys
private static final String MODEL_NAME_KEY = "model-name";
@@ -295,7 +296,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
"UDF",
udfNode,
Arrays.asList(UDF_FUNCTION_NAME_KEY, UDF_CLASSPATH_KEY),
- Collections.emptyList());
+ Collections.singletonList(UDF_OPTIONS_KEY));
String functionName =
checkNotNull(
@@ -310,7 +311,15 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
UDF_CLASSPATH_KEY)
.asText();
- return new UdfDef(functionName, classpath);
+ Map<String, String> options =
+ Optional.ofNullable(udfNode.get(UDF_OPTIONS_KEY))
+ .map(
+ node ->
+ mapper.convertValue(
+ node, new
TypeReference<Map<String, String>>() {}))
+ .orElse(null);
+
+ return new UdfDef(functionName, classpath, options);
}
private TransformDef toTransformDef(JsonNode transformNode) {
diff --git
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index bf4c377ee..bbfafcf8e 100644
---
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -197,6 +197,15 @@ class YamlPipelineDefinitionParserTest {
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf);
}
+ @Test
+ void testUdfDefinitionWithOptions() throws Exception {
+ URL resource =
+
Resources.getResource("definitions/pipeline-definition-with-udf-options.yaml");
+ YamlPipelineDefinitionParser parser = new
YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new
Configuration());
+ assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
+ }
+
@Test
void testSchemaEvolutionTypesConfiguration() throws Exception {
testSchemaEvolutionTypesParsing(
@@ -669,4 +678,42 @@ class YamlPipelineDefinitionParserTest {
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.build()));
+
+ private final PipelineDef pipelineDefWithUdfOptions =
+ new PipelineDef(
+ new SourceDef("values", null, new Configuration()),
+ new SinkDef(
+ "values",
+ null,
+ new Configuration(),
+ ImmutableSet.of(
+ DROP_COLUMN,
+ ALTER_COLUMN_TYPE,
+ ADD_COLUMN,
+ CREATE_TABLE,
+ RENAME_COLUMN)),
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+ "mydb.web_order",
+ "*, query_redis(id) as redis_value",
+ "id > 0",
+ null,
+ null,
+ null,
+ null,
+ null)),
+ Collections.singletonList(
+ new UdfDef(
+ "query_redis",
+
"org.apache.flink.cdc.udf.examples.java.RedisQueryFunction",
+ ImmutableMap.<String, String>builder()
+ .put("hostname", "localhost")
+ .put("port", "6379")
+ .put("cache.enabled", "true")
+ .build())),
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("parallelism", "1")
+ .build()));
}
diff --git
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml
new file mode 100644
index 000000000..6a985cbf4
--- /dev/null
+++
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-udf-options.yaml
@@ -0,0 +1,36 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+source:
+ type: values
+
+sink:
+ type: values
+
+transform:
+ - source-table: mydb.web_order
+ projection: "*, query_redis(id) as redis_value"
+ filter: id > 0
+
+pipeline:
+ parallelism: 1
+ user-defined-function:
+ - name: query_redis
+ classpath: org.apache.flink.cdc.udf.examples.java.RedisQueryFunction
+ options:
+ hostname: localhost
+ port: "6379"
+ cache.enabled: "true"
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
index 6dbc580fb..0486619ec 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
@@ -17,6 +17,8 @@
package org.apache.flink.cdc.composer.definition;
+import java.util.Collections;
+import java.util.Map;
import java.util.Objects;
/**
@@ -27,15 +29,22 @@ import java.util.Objects;
* <ul>
* <li>name: Static method name of user-defined functions.
* <li>classpath: Fully-qualified class path of package containing given
function.
+ * <li>options: Configuration options for the user-defined function.
* </ul>
*/
public class UdfDef {
private final String name;
private final String classpath;
+ private final Map<String, String> options;
public UdfDef(String name, String classpath) {
+ this(name, classpath, Collections.emptyMap());
+ }
+
+ public UdfDef(String name, String classpath, Map<String, String> options) {
this.name = name;
this.classpath = classpath;
+ this.options = options != null ? options : Collections.emptyMap();
}
public String getName() {
@@ -46,6 +55,10 @@ public class UdfDef {
return classpath;
}
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -56,16 +69,27 @@ public class UdfDef {
}
UdfDef udfDef = (UdfDef) o;
- return Objects.equals(name, udfDef.name) && Objects.equals(classpath,
udfDef.classpath);
+ return Objects.equals(name, udfDef.name)
+ && Objects.equals(classpath, udfDef.classpath)
+ && Objects.equals(options, udfDef.options);
}
@Override
public int hashCode() {
- return Objects.hash(name, classpath);
+ return Objects.hash(name, classpath, options);
}
@Override
public String toString() {
- return "UdfDef{" + "name='" + name + '\'' + ", classpath='" +
classpath + '\'' + '}';
+ return "UdfDef{"
+ + "name='"
+ + name
+ + '\''
+ + ", classpath='"
+ + classpath
+ + '\''
+ + ", options="
+ + options
+ + '}';
}
}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
index 4cc7a0b24..a938d7ab0 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
@@ -30,7 +30,6 @@ import
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperatorBuil
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -133,6 +132,6 @@ public class TransformTranslator {
}
private Tuple3<String, String, Map<String, String>>
udfDefToUDFTuple(UdfDef udf) {
- return Tuple3.of(udf.getName(), udf.getClasspath(), new HashMap<>());
+ return Tuple3.of(udf.getName(), udf.getClasspath(), udf.getOptions());
}
}
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
index 5daf370af..9d5a8b22e 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
@@ -547,6 +547,79 @@ class FlinkPipelineUdfITCase {
.contains("[ LifecycleFunction ] closed. Called 6 times.");
}
+ @ParameterizedTest
+ @MethodSource("testParams")
+ void testConfigurableUdf(ValuesDataSink.SinkApi sinkApi, String language)
throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup transform
+ TransformDef transformDef =
+ new TransformDef(
+ "default_namespace.default_schema.table1",
+ "*, greet(col1) as greeting",
+ null,
+ "col1",
+ null,
+ "key1=value1",
+ "",
+ null);
+
+ // Setup UDF with options
+ UdfDef udfDef =
+ new UdfDef(
+ "greet",
+ String.format(
+
"org.apache.flink.cdc.udf.examples.%s.ConfigurableFunctionClass",
+ language),
+ ImmutableMap.of("greeting", "Hi", "suffix", "~"));
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(transformDef),
+ Collections.singletonList(udfDef),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+ assertThat(outputEvents)
+ .containsExactly(
+
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`greeting` STRING},
primaryKeys=col1, options=({key1=value1})}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[1, 1, Hi 1~], op=INSERT, meta=({op_ts=1})}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[2, 2, Hi 2~], op=INSERT, meta=({op_ts=2})}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[3, 3, Hi 3~], op=INSERT, meta=({op_ts=3})}",
+
"AddColumnEvent{tableId=default_namespace.default_schema.table1,
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER,
existedColumnName=col2}]}",
+
"RenameColumnEvent{tableId=default_namespace.default_schema.table1,
nameMapping={col2=newCol2, col3=newCol3}}",
+
"DropColumnEvent{tableId=default_namespace.default_schema.table1,
droppedColumnNames=[newCol2]}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1,
Hi 1~], after=[], op=DELETE, meta=({op_ts=4})}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ,
Hi 2~], after=[2, x, Hi 2~], op=UPDATE, meta=({op_ts=5})}");
+ }
+
// --------------------------
// Flink-compatible UDF tests
// --------------------------
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
index 293f76cd4..ff7339a4c 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
@@ -79,5 +79,5 @@ steps:
language: clojure
error: |
Unexpected key `language` in YAML UDF block.
- Allowed keys in this context are: [name, classpath]
+ Allowed keys in this context are: [name, classpath, options]
Note: option language: "clojure" is unexpected. It was silently ignored
in previous versions, and probably should be removed.
diff --git
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java
new file mode 100644
index 000000000..1ce7309b7
--- /dev/null
+++
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ConfigurableFunctionClass.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
+
+/** This is an example UDF class that reads options from configuration. */
+public class ConfigurableFunctionClass implements UserDefinedFunction {
+
+ private static final ConfigOption<String> GREETING =
+ ConfigOptions.key("greeting").stringType().defaultValue("Hello");
+
+ private static final ConfigOption<String> SUFFIX =
+ ConfigOptions.key("suffix").stringType().defaultValue("!");
+
+ private String greeting;
+ private String suffix;
+
+ public String eval(String value) {
+ return greeting + " " + value + suffix;
+ }
+
+ @Override
+ public void open(UserDefinedFunctionContext context) throws Exception {
+ greeting = context.configuration().get(GREETING);
+ suffix = context.configuration().get(SUFFIX);
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala
new file mode 100644
index 000000000..6405ef715
--- /dev/null
+++
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/ConfigurableFunctionClass.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala
+
+import org.apache.flink.cdc.common.configuration.ConfigOptions
+import org.apache.flink.cdc.common.udf.{UserDefinedFunction,
UserDefinedFunctionContext}
+
+/** This is an example UDF class that reads options from configuration. */
+class ConfigurableFunctionClass extends UserDefinedFunction {
+
+ private var greeting: String = "Hello"
+ private var suffix: String = "!"
+
+ def eval(value: String): String = {
+ greeting + " " + value + suffix
+ }
+
+ override def open(context: UserDefinedFunctionContext): Unit = {
+ greeting = context.configuration().get(ConfigurableFunctionClass.GREETING)
+ suffix = context.configuration().get(ConfigurableFunctionClass.SUFFIX)
+ }
+
+ override def close(): Unit = {}
+}
+
+object ConfigurableFunctionClass {
+ private val GREETING =
ConfigOptions.key("greeting").stringType().defaultValue("Hello")
+ private val SUFFIX =
ConfigOptions.key("suffix").stringType().defaultValue("!")
+}