[ 
https://issues.apache.org/jira/browse/FLINK-6355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979723#comment-15979723
 ] 

Kaibo Zhou commented on FLINK-6355:
-----------------------------------

Hi, [~fhueske]
The previous version of the example code did not fully express what I meant. I 
have updated the example, and add some background information。

Thanks

> TableEnvironment support register TableFunction
> -----------------------------------------------
>
>                 Key: FLINK-6355
>                 URL: https://issues.apache.org/jira/browse/FLINK-6355
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> Currently {{TableEnvironment}} only supports register ScalarFunction, the 
> Java users cannot use {{TableEnvironment}} to register {{TableFunction}}.
> This is the test code:
> {code:title=TableFunc0.java|borderStyle=solid}
> public class TableFunc0 extends TableFunction<String> {
>       public void eval(String str) {
>            for (String s : str.split(" ")) {
>              collect(s);
>            }
>         }
> }
> {code}
> {code:title=WordCountExample.java|borderStyle=solid}
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.Types;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.table.sources.CsvTableSource;
> import org.apache.flink.table.sources.TableSource;
> import org.apache.flink.types.Row;
> public class WordCountExample {
>     public void build(TableEnvironment tableEnv, java.util.Properties 
> properties) {
>         TableSource<Row> tableSource = new CsvTableSource("/path/to/csv/", 
> new String[]{"a", "b"}, new TypeInformation[]{Types.STRING(), 
> Types.STRING()});
>         tableEnv.registerTableSource("csv_source", tableSource);
>         TableFunc0 tableFunc0 = new TableFunc0();
>         //  expected
>         tableEnv.registerFunction("func0", tableFunc0); // will compile error 
> here
>         // not expected
>         ((StreamTableEnvironment) tableEnv).registerFunction("func0", 
> tableFunc0); // also can work, but not what we wanted
>         tableEnv.scan("csv_source")
>                 .select("a, b")
>                 .join("func0(a) as c")
>                 .select("b, c")
>                 .writeToSink(new CsvTableSink("/path/to/result", ","));
>     }
> }
> {code}
> There will be compilation errors in tableEnv.registerFunction: "Found 
> xx.xxx.TableFunc0,required org.apache.flink.table.functions.ScalarFunction"。
> I did some testing, only Java users have this problem.
> ----
> Why the user interface only has {{TableEnvironment}}, but not 
> {{StreamTableEnvironment}} or {{BatchStreamTableEnvironment}} ? 
> We want to eliminate most of the differences between the batch and the stream 
> execution environment, and support the use of the {{TableEnvironment}} to 
> write programs only. So that users only need to write a code, and can use the 
> command parameters to determine run in the batch or stream processing 
> environment.This is also the advantage of flink which can unify batch and 
> stream processing.
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to