bestchinacoder opened a new issue, #2540: URL: https://github.com/apache/incubator-seatunnel/issues/2540
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened I am using sql plugin and udf plugin for custom conversion of data and trying to use custom functions in udf plugin. Here is the configuration of transform in my configuration file ``` transform { udf { function.myudf = "com.wcq.demo.udf.JdbcUDF" } sql { source_table_name = result_tile_fun sql = "select '1234567890' as `#app_id`,'track' as `type`,myudf(event_timestamp) as `#time`,event_name as `#event_name`,user_pseudo_id as `#distinct_id`,myudf(event_previous_timestamp) as event_previous_timestamp,myudf(user_first_touch_timestamp) as user_first_touch_timestamp,event_date,event_bundle_sequence_id,event_server_timestamp_offset,stream_id,platform from result_tile_fun" result_table_name = transform_tile_fun } } ``` source, sink, and sql, udf plugins can be loaded successfully, and the class where the custom function is located can be loaded successfully via Class.forName(). But when I use a custom function in sql, it gives me the following error ``` Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Flink job executed failed at org.apache.seatunnel.core.starter.flink.command.FlinkApiTaskExecuteCommand.execute(FlinkApiTaskExecuteCommand.java:60) at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:40) at org.apache.seatunnel.example.flink.v2.Jdbc2LocalfileExample.main(Jdbc2LocalfileExample.java:33) Caused by: org.apache.seatunnel.core.starter.exception.TaskExecuteException: SeaTunnel transform task: sql execute error at org.apache.seatunnel.core.starter.flink.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:94) at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:68) at org.apache.seatunnel.core.starter.flink.command.FlinkApiTaskExecuteCommand.execute(FlinkApiTaskExecuteCommand.java:58) ... 2 more Caused by: java.lang.Exception: Flink streaming transform sql execute failed, SQL: select '1234567890' as `#app_id`,'track' as `type`,myudf(event_timestamp) as `#time`,event_name as `#event_name`,user_pseudo_id as `#distinct_id`,myudf(event_previous_timestamp) as event_previous_timestamp,myudf(user_first_touch_timestamp) as user_first_touch_timestamp,event_date,event_bundle_sequence_id,event_server_timestamp_offset,stream_id,platform from result_tile_fun at org.apache.seatunnel.flink.transform.Sql.processStream(Sql.java:53) at org.apache.seatunnel.core.starter.flink.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:86) ... 4 more Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 52 to line 1, column 73: No match found for function signature myudf(<CHARACTER>) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704) at org.apache.seatunnel.flink.transform.Sql.processStream(Sql.java:51) ... 5 more Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 52 to line 1, column 73: No match found for function signature myudf(<CHARACTER>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4860) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1813) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:420) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4060) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3346) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:996) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:974) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:951) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151) ... 10 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature myudf(<CHARACTER>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) ... 38 more ``` Here is my custom function class ``` public class JdbcUDF extends ScalarFunction { public JdbcUDF(){} public String eval(String data){ return DateFormatUtils.format(new Date(Long.parseLong(data)/1000),"yyyy-MM-dd HH:mm:ss"); } } ``` ### SeaTunnel Version Version2.1.3 -- dev ### SeaTunnel Config ```conf env { execution.parallelism = 1 execution.planner = blink job.mode = STREAMING } source { Jdbc { driver = com.mysql.cj.jdbc.Driver url = "jdbc:mysql://localhost:3306/test" user = root password = wcq5201314 query = "select * from `tile-fun`" result_table_name = result_tile_fun } } transform { udf { function.myudf = "com.wcq.demo.udf.JdbcUDF" } sql { source_table_name = result_tile_fun sql = "select '70c892821ee24e2fb7ce02393427e53a' as `#app_id`,myudf(event_timestamp) as `#time`,event_name as `#event_name`,user_pseudo_id as `#distinct_id`,myudf(event_previous_timestamp) as event_previous_timestamp,myudf(user_first_touch_timestamp) as user_first_touch_timestamp,event_date,event_bundle_sequence_id,event_server_timestamp_offset,stream_id,platform from result_tile_fun" result_table_name = transform_tile_fun } } sink { TaSink { push.url = "https://receiver-ta-dev.thinkingdata.cn/logbus" compress = "none" push.interval = 1000 batch.size = 2 type = "track" array.split.regex = "," columns = [ {"colSourceName":"#app_id","colTargetName":"#app_id"}, {"colSourceName":"#time","colTargetName":"#time"}, {"colSourceName":"#event_name","colTargetName":"#event_name"}, {"colSourceName":"#distinct_id","colTargetName":"#distinct_id"}, {"colSourceName":"event_previous_timestamp","colTargetName":"event_previous_timestamp"}, {"colSourceName":"user_first_touch_timestamp","colTargetName":"user_first_touch_timestamp"}, {"colSourceName":"event_date", "colTargetName": "event_date"}, {"colSourceName":"event_bundle_sequence_id", "colTargetName": "event_bundle_sequence_id"}, {"colSourceName":"event_server_timestamp_offset","colTargetName": "event_server_timestamp_offset"}, {"colSourceName":"stream_id","colTargetName": "stream_id"}, {"colSourceName":"platform","colTargetName": "platform"} ] } } ``` ### Running Command ```shell I executed it through the SeaTunnelApiExample class under the seatunnel-flink-connector-v2-example module and with my own configuration file. ``` ### Error Exception ```log See above for details ``` ### Flink or Spark Version Using Local Mode in IDEA ### Java or Scala Version Java8 ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
