[
https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083380#comment-16083380
]
yuemeng commented on FLINK-7151:
--------------------------------
[~fhueske]
thanks for your reply.
I will split this issue with two different specific title.thank you
[~jark]
user can use temporary function to create temporary udf or udaf just like hive
for their own job.
suppose user want to use a special udf(not buitin function) for their own
special job.what we do?
maybe we can create a temporary function for this job.
{code}
Hive:
create temporary function mytest as 'test.udf.ToLowerCase';
select mytest(test.name) from test;
{code}
temporary table is conceptual table,not a physical table,we can use it for
streaming etl logical
{code}
CREATE TABLE kafka_source (
id INT,
name VARCHAR(100),
price INT,
comment VARCHAR(100)
) PROPERTIES (
category = 'source',
type = 'kafka',
version = '0.9.0.1',
separator = ',',
topic = 'test',
brokers = xxxxg:9092',
group_id = 'test'
);
CREATE TABLE tmp (
id INT,
name VARCHAR(100),
price INT
) PROPERTIES (
category = 'tmp'
);
CREATE TABLE db_sink (
id INT,
name VARCHAR(100),
price INT
) PROPERTIES (
category = 'sink',
type = 'mysql',
table_name = 'test',
url = 'jdbc:mysql://127.0.0.1:3307/ds?useUnicode=true&characterEncoding=UTF8',
username = 'ds_dev',
password = 's]k51_(>R'
);
-- INSERT INTO db_sink SELECT id, name,price FROM kafka_source;
--INSERT INTO db_sink SELECT id, name, sum(price) FROM kafka_source GROUP BY
id,name HAVING sum(price) > 1 AND id < 10;
INSERT INTO tmp SELECT id,name,price + 1.0 FROM kafka_source;
INSERT into db_sink SELECT id,name,price + 1 as price FROM tmp;
{code}
> FLINK SQL support create temporary function and table
> -----------------------------------------------------
>
> Key: FLINK-7151
> URL: https://issues.apache.org/jira/browse/FLINK-7151
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: yuemeng
>
> Based on create temporary function and table.we can register a udf,udaf,udtf
> use sql:
> {code}
> CREATE TEMPORARY function 'TOPK' AS
> 'com.xxxx.aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP
> BY id;
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)