ruanhang1993 commented on code in PR #3465:
URL: https://github.com/apache/flink-cdc/pull/3465#discussion_r1694814738
##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java:
##########
@@ -118,7 +119,7 @@ public void testJaninoTimestampFunction() throws
InvocationTargetException {
List<Object> params = Arrays.asList(epochTime);
ExpressionEvaluator expressionEvaluator =
JaninoCompiler.compileExpression(
- JaninoCompiler.loadSystemFunction(expression),
Review Comment:
Please add some unit tests and e2e tests about the udf.
##########
docs/content/docs/core-concept/data-pipeline.md:
##########
@@ -79,13 +79,30 @@ We could use following yaml file to define a complicated
Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""
+
+ transform:
+ - source-table: adb.web_order01
+ projection: \*, UPPER(product_name) as product_name
+ filter: id > 10 AND order_id > 100
+ description: project fields and filter
+ - source-table: adb.web_order02
+ projection: \*, UPPER(product_name) as product_name
+ filter: id > 20 AND order_id > 200
+ description: project fields and filter
+
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
- sink-table: ods_db.ods_products
+ sink-table: ods_db.ods_products
+
+ udf:
+ - name: substring
+ classpath: com.example.functions.scalar.SubStringFunctionClass
Review Comment:
Please add SubStringFunctionClass's code in the doc. And describe how to
append them in the classpath.
##########
docs/content/docs/core-concept/data-pipeline.md:
##########
@@ -79,13 +79,30 @@ We could use following yaml file to define a complicated
Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""
+
+ transform:
+ - source-table: adb.web_order01
+ projection: \*, UPPER(product_name) as product_name
+ filter: id > 10 AND order_id > 100
+ description: project fields and filter
+ - source-table: adb.web_order02
+ projection: \*, UPPER(product_name) as product_name
+ filter: id > 20 AND order_id > 200
+ description: project fields and filter
+
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
- sink-table: ods_db.ods_products
+ sink-table: ods_db.ods_products
+
+ udf:
+ - name: substring
Review Comment:
Add the usage of this udf in this pipeline.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##########
@@ -87,15 +96,34 @@ private static SqlParser getCalciteParser(String sql) {
.withLex(Lex.JAVA));
}
- private static RelNode sqlToRel(List<Column> columns, SqlNode sqlNode) {
+ private static RelNode sqlToRel(
+ List<Column> columns, SqlNode sqlNode, List<Tuple2<String,
String>> udfs) {
List<Column> columnsWithMetadata =
copyFillMetadataColumn(sqlNode.toString(), columns);
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
+ SchemaPlus schema = rootSchema.plus();
Map<String, Object> operand = new HashMap<>();
operand.put("tableName", DEFAULT_TABLE);
operand.put("columns", columnsWithMetadata);
rootSchema.add(
DEFAULT_SCHEMA,
- TransformSchemaFactory.INSTANCE.create(rootSchema.plus(),
DEFAULT_SCHEMA, operand));
+ TransformSchemaFactory.INSTANCE.create(schema, DEFAULT_SCHEMA,
operand));
+ List<SqlFunction> udfFunctions = new ArrayList<>();
+ for (Tuple2<String, String> udf : udfs) {
+ try {
+ ScalarFunction function =
ScalarFunctionImpl.create(Class.forName(udf.f1), udf.f0);
Review Comment:
Could users use their existed flink sql's scalar function here? I think we
should support the flink sql udfs to reduce the difficulty of user migration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]