[
https://issues.apache.org/jira/browse/FLINK-6355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979723#comment-15979723
]
Kaibo Zhou commented on FLINK-6355:
-----------------------------------
Hi, [~fhueske]
The previous version of the example code did not fully express what I meant. I
have updated the example, and add some background information。
Thanks
> TableEnvironment support register TableFunction
> -----------------------------------------------
>
> Key: FLINK-6355
> URL: https://issues.apache.org/jira/browse/FLINK-6355
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Kaibo Zhou
> Assignee: Kaibo Zhou
>
> Currently {{TableEnvironment}} only supports register ScalarFunction, the
> Java users cannot use {{TableEnvironment}} to register {{TableFunction}}.
> This is the test code:
> {code:title=TableFunc0.java|borderStyle=solid}
> public class TableFunc0 extends TableFunction<String> {
> public void eval(String str) {
> for (String s : str.split(" ")) {
> collect(s);
> }
> }
> }
> {code}
> {code:title=WordCountExample.java|borderStyle=solid}
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.Types;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.table.sources.CsvTableSource;
> import org.apache.flink.table.sources.TableSource;
> import org.apache.flink.types.Row;
> public class WordCountExample {
> public void build(TableEnvironment tableEnv, java.util.Properties
> properties) {
> TableSource<Row> tableSource = new CsvTableSource("/path/to/csv/",
> new String[]{"a", "b"}, new TypeInformation[]{Types.STRING(),
> Types.STRING()});
> tableEnv.registerTableSource("csv_source", tableSource);
> TableFunc0 tableFunc0 = new TableFunc0();
> // expected
> tableEnv.registerFunction("func0", tableFunc0); // will compile error
> here
> // not expected
> ((StreamTableEnvironment) tableEnv).registerFunction("func0",
> tableFunc0); // also can work, but not what we wanted
> tableEnv.scan("csv_source")
> .select("a, b")
> .join("func0(a) as c")
> .select("b, c")
> .writeToSink(new CsvTableSink("/path/to/result", ","));
> }
> }
> {code}
> There will be compilation errors in tableEnv.registerFunction: "Found
> xx.xxx.TableFunc0,required org.apache.flink.table.functions.ScalarFunction"。
> I did some testing, only Java users have this problem.
> ----
> Why the user interface only has {{TableEnvironment}}, but not
> {{StreamTableEnvironment}} or {{BatchStreamTableEnvironment}} ?
> We want to eliminate most of the differences between the batch and the stream
> execution environment, and support the use of the {{TableEnvironment}} to
> write programs only. So that users only need to write a code, and can use the
> command parameters to determine run in the batch or stream processing
> environment.This is also the advantage of flink which can unify batch and
> stream processing.
>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)