Hi guys,
I'm trying to add a UDF in Flink table API, say, in DataSet table API.
My example code is as follows:
---------------------------
object WordCountTable {
  case class WC(word: String, num: Int)

  def main(args: Array[String]): Unit = {

    // set up execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val input = env.fromElements(WC("hello", 1), WC("hello", 2), WC("ciao",
3))
    val table = input.toTable(tEnv)

    tEnv.registerTable("WC", table)

    //val funcCatalog = tblEnv.getFunctionCatalog
    //funcCatalog.registerFunction("MyAdd1", classOf[MyAdds.MyAdd1])

    val schema = tEnv.getFrameworkConfig.getDefaultSchema
    schema.add("MyAdd1", ScalarFunctionImpl.create(classOf[MyAdds.MyAdd1],
"eval"))

    tEnv.sql("SELECT word, MyAdd1(num) as num FROM WC WHERE num >
1").toDataSet[WC].print()
  }
}

And MyAdds looks like this:
---------------------------
public class MyAdds {
    public static class MyAdd1 {
        public int eval(int x) {
            return x + 1;
        }
    }
}

But when running, it gives the error:
Exception in thread "main" org.apache.calcite.tools.ValidationException:
org.apache.calcite.runtime.CalciteContextException: From line 1, column 14
to line 1, column 24: No match found for function signature
MyAdd1(<NUMERIC>)
at
org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
at
org.apache.flink.api.table.BatchTableEnvironment.sql(BatchTableEnvironment.scala:130)
at
org.apache.flink.examples.scala.WordCountTable$.main(WordCountTable.scala:54)
at org.apache.flink.examples.scala.WordCountTable.main(WordCountTable.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
column 14 to line 1, column 24: No match found for function signature
MyAdd1(<NUMERIC>)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:765)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:753)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:3929)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1544)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:278)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:222)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:132)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:439)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3428)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:2966)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:86)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:845)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:831)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:208)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:807)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:523)
at
org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:95)
... 8 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match
found for function signature MyAdd1(<NUMERIC>)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:514)
... 36 more


I'm not sure if it's the right way to add UDF in table API, or am I missing
something? Thanks~

Reply via email to