Hi guys, I'm trying to add a UDF in Flink table API, say, in DataSet table API. My example code is as follows: --------------------------- object WordCountTable { case class WC(word: String, num: Int)
def main(args: Array[String]): Unit = { // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val input = env.fromElements(WC("hello", 1), WC("hello", 2), WC("ciao", 3)) val table = input.toTable(tEnv) tEnv.registerTable("WC", table) //val funcCatalog = tblEnv.getFunctionCatalog //funcCatalog.registerFunction("MyAdd1", classOf[MyAdds.MyAdd1]) val schema = tEnv.getFrameworkConfig.getDefaultSchema schema.add("MyAdd1", ScalarFunctionImpl.create(classOf[MyAdds.MyAdd1], "eval")) tEnv.sql("SELECT word, MyAdd1(num) as num FROM WC WHERE num > 1").toDataSet[WC].print() } } And MyAdds looks like this: --------------------------- public class MyAdds { public static class MyAdd1 { public int eval(int x) { return x + 1; } } } But when running, it gives the error: Exception in thread "main" org.apache.calcite.tools.ValidationException: org.apache.calcite.runtime.CalciteContextException: From line 1, column 14 to line 1, column 24: No match found for function signature MyAdd1(<NUMERIC>) at org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99) at org.apache.flink.api.table.BatchTableEnvironment.sql(BatchTableEnvironment.scala:130) at org.apache.flink.examples.scala.WordCountTable$.main(WordCountTable.scala:54) at org.apache.flink.examples.scala.WordCountTable.main(WordCountTable.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 14 to line 1, column 24: No match found for function signature MyAdd1(<NUMERIC>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:765) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:753) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:3929) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1544) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:278) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:222) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:132) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:439) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3428) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:2966) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:845) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:831) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:208) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:807) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:523) at org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:95) ... 8 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature MyAdd1(<NUMERIC>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:514) ... 36 more I'm not sure if it's the right way to add UDF in table API, or am I missing something? Thanks~