bestchinacoder opened a new issue, #2540:
URL: https://github.com/apache/incubator-seatunnel/issues/2540

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   I am using sql plugin and udf plugin for custom conversion of data and 
trying to use custom functions in udf plugin.
   Here is the configuration of transform in my configuration file
   ```
   transform {
       udf {
           function.myudf = "com.wcq.demo.udf.JdbcUDF"
       }
       sql {
           source_table_name = result_tile_fun
           sql = "select '1234567890' as `#app_id`,'track' as 
`type`,myudf(event_timestamp) as `#time`,event_name as 
`#event_name`,user_pseudo_id as `#distinct_id`,myudf(event_previous_timestamp) 
as event_previous_timestamp,myudf(user_first_touch_timestamp) as 
user_first_touch_timestamp,event_date,event_bundle_sequence_id,event_server_timestamp_offset,stream_id,platform
 from result_tile_fun"
           result_table_name = transform_tile_fun
       }
   }
   ```
   source, sink, and sql, udf plugins can be loaded successfully, and the class 
where the custom function is located can be loaded successfully via 
Class.forName().
   But when I use a custom function in sql, it gives me the following error
   ```
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: Flink job 
executed failed
        at 
org.apache.seatunnel.core.starter.flink.command.FlinkApiTaskExecuteCommand.execute(FlinkApiTaskExecuteCommand.java:60)
        at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:40)
        at 
org.apache.seatunnel.example.flink.v2.Jdbc2LocalfileExample.main(Jdbc2LocalfileExample.java:33)
   Caused by: org.apache.seatunnel.core.starter.exception.TaskExecuteException: 
SeaTunnel transform task: sql execute error
        at 
org.apache.seatunnel.core.starter.flink.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:94)
        at 
org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:68)
        at 
org.apache.seatunnel.core.starter.flink.command.FlinkApiTaskExecuteCommand.execute(FlinkApiTaskExecuteCommand.java:58)
        ... 2 more
   Caused by: java.lang.Exception: Flink streaming transform sql execute 
failed, SQL: select '1234567890' as `#app_id`,'track' as 
`type`,myudf(event_timestamp) as `#time`,event_name as 
`#event_name`,user_pseudo_id as `#distinct_id`,myudf(event_previous_timestamp) 
as event_previous_timestamp,myudf(user_first_touch_timestamp) as 
user_first_touch_timestamp,event_date,event_bundle_sequence_id,event_server_timestamp_offset,stream_id,platform
 from result_tile_fun
        at org.apache.seatunnel.flink.transform.Sql.processStream(Sql.java:53)
        at 
org.apache.seatunnel.core.starter.flink.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:86)
        ... 4 more
   Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. From line 1, column 52 to line 1, column 73: No match found for 
function signature myudf(<CHARACTER>)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
        at org.apache.seatunnel.flink.transform.Sql.processStream(Sql.java:51)
        ... 5 more
   Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 52 to line 1, column 73: No match found for function signature 
myudf(<CHARACTER>)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4860)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1813)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)
        at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
        at 
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:420)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4060)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3346)
        at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:996)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:974)
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:951)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
        ... 10 more
   Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match 
found for function signature myudf(<CHARACTER>)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
        ... 38 more
   ```
   Here is my custom function class
   ```
   public class JdbcUDF extends ScalarFunction {
   
       public JdbcUDF(){}
   
       public String eval(String data){
           return DateFormatUtils.format(new 
Date(Long.parseLong(data)/1000),"yyyy-MM-dd HH:mm:ss");
       }
   
   }
   ```
   
   
   ### SeaTunnel Version
   
   Version2.1.3 -- dev
   
   ### SeaTunnel Config
   
   ```conf
   env {
       execution.parallelism = 1
       execution.planner = blink
       job.mode = STREAMING
   }
   
   source {
       Jdbc {
           driver = com.mysql.cj.jdbc.Driver
           url = "jdbc:mysql://localhost:3306/test"
           user = root
           password = wcq5201314
           query = "select * from `tile-fun`"
           result_table_name = result_tile_fun
       }
   }
   
   transform {
       udf {
           function.myudf = "com.wcq.demo.udf.JdbcUDF"
       }
       sql {
           source_table_name = result_tile_fun
           sql = "select '70c892821ee24e2fb7ce02393427e53a' as 
`#app_id`,myudf(event_timestamp) as `#time`,event_name as 
`#event_name`,user_pseudo_id as `#distinct_id`,myudf(event_previous_timestamp) 
as event_previous_timestamp,myudf(user_first_touch_timestamp) as 
user_first_touch_timestamp,event_date,event_bundle_sequence_id,event_server_timestamp_offset,stream_id,platform
 from result_tile_fun"
           result_table_name = transform_tile_fun
       }
   }
   
   sink {
     TaSink {
       push.url = "https://receiver-ta-dev.thinkingdata.cn/logbus";
       compress = "none"
       push.interval = 1000
       batch.size = 2
       type = "track"
       array.split.regex = ","
       columns = [
                 {"colSourceName":"#app_id","colTargetName":"#app_id"},
                 {"colSourceName":"#time","colTargetName":"#time"},
                 {"colSourceName":"#event_name","colTargetName":"#event_name"},
                 
{"colSourceName":"#distinct_id","colTargetName":"#distinct_id"},
                 
{"colSourceName":"event_previous_timestamp","colTargetName":"event_previous_timestamp"},
                 
{"colSourceName":"user_first_touch_timestamp","colTargetName":"user_first_touch_timestamp"},
                 {"colSourceName":"event_date", "colTargetName": "event_date"},
                 {"colSourceName":"event_bundle_sequence_id", "colTargetName": 
"event_bundle_sequence_id"},
                 
{"colSourceName":"event_server_timestamp_offset","colTargetName": 
"event_server_timestamp_offset"},
                 {"colSourceName":"stream_id","colTargetName": "stream_id"},
                 {"colSourceName":"platform","colTargetName": "platform"}
                 ]
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   I executed it through the SeaTunnelApiExample class under the 
seatunnel-flink-connector-v2-example module and with my own configuration file.
   ```
   
   
   ### Error Exception
   
   ```log
   See above for details
   ```
   
   
   ### Flink or Spark Version
   
   Using Local Mode in IDEA
   
   ### Java or Scala Version
   
   Java8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to