[ 
https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083380#comment-16083380
 ] 

yuemeng commented on FLINK-7151:
--------------------------------

[~fhueske]
thanks for your reply.
I will split this issue with two different specific title.thank you
[~jark]
user can use temporary function to create temporary udf or udaf just like hive 
for their own job.
suppose user want to use a special udf(not buitin function) for their own 
special job.what we do?
maybe we can create a temporary  function for this job.
{code}
Hive:
create temporary function mytest as 'test.udf.ToLowerCase'; 
select mytest(test.name) from test; 
{code}
temporary table is conceptual table,not a physical table,we can use it for 
streaming etl logical
{code}
CREATE TABLE kafka_source (
  id INT,
  name VARCHAR(100),
  price INT,
  comment VARCHAR(100)
) PROPERTIES (
  category = 'source',
  type = 'kafka',
  version = '0.9.0.1',
  separator = ',',
  topic = 'test',
  brokers = xxxxg:9092',
  group_id = 'test'
);

CREATE TABLE tmp (
  id INT,
  name VARCHAR(100),
  price INT
) PROPERTIES (
  category = 'tmp'
);


CREATE TABLE db_sink (
  id INT,
  name VARCHAR(100),
  price INT
) PROPERTIES (
  category = 'sink',
  type = 'mysql',
  table_name = 'test',
  url = 'jdbc:mysql://127.0.0.1:3307/ds?useUnicode=true&characterEncoding=UTF8',
  username = 'ds_dev',
  password = 's]k51_(>R'
);

-- INSERT INTO db_sink SELECT id, name,price FROM kafka_source;
--INSERT INTO db_sink SELECT id, name, sum(price) FROM kafka_source GROUP BY 
id,name HAVING sum(price) > 1 AND id < 10;
INSERT INTO tmp SELECT id,name,price + 1.0 FROM kafka_source;
INSERT into db_sink SELECT id,name,price + 1 as price FROM tmp;


{code}



> FLINK SQL support create temporary function and table
> -----------------------------------------------------
>
>                 Key: FLINK-7151
>                 URL: https://issues.apache.org/jira/browse/FLINK-7151
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: yuemeng
>
> Based on create temporary function and table.we can register a udf,udaf,udtf 
> use sql:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com.xxxx.aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to