[ 
https://issues.apache.org/jira/browse/FLINK-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376670#comment-17376670
 ] 

frank wang commented on FLINK-14055:
------------------------------------

[~ZhenqiuHuang],we implement CatalogFactory interface and 
FunctionDefinitionFactory interface 

and the core code like this

 @Override
public FunctionDefinition createFunctionDefinition(String name, CatalogFunction 
catalogFunction) {
        // Currently only handles Java class-based functions
        Object func;
        String classNameInnter = catalogFunction.getClassName().substring(0, 
catalogFunction.getClassName().lastIndexOf("."))+"$"+catalogFunction.getClassName().substring(catalogFunction.getClassName().lastIndexOf(".")
 + 1);
        HdfsClassLoader hdfsClassLoader = null;
        try {
                hdfsClassLoader = 
HdfsClassLoader.getClazz(catalogFunction.getProperties().get("JAR_PATH"));
                func = hdfsClassLoader.loadClass(classNameInnter).newInstance();
        } catch (Exception e) {
                try {
                        func = 
hdfsClassLoader.loadClass(catalogFunction.getClassName()).newInstance();
                } catch (Exception ie) {
                        try {
                                func = 
Thread.currentThread().getContextClassLoader().loadClass(catalogFunction.getClassName()).newInstance();
                        } catch (Exception iie) {
                                throw new IllegalStateException(
                                                String.format("Failed 
instantiating '%s'", catalogFunction.getClassName())
                                );
                        }
                }
        }

        UserDefinedFunction udf = (UserDefinedFunction) func;

        if (udf instanceof ScalarFunction) {
                return new ScalarFunctionDefinition(
                                name,
                                (ScalarFunction) udf
                );
        } else if (udf instanceof TableFunction) {
                TableFunction t = (TableFunction) udf;
                return new TableFunctionDefinition(
                                name,
                                t,
                                t.getResultType()
                );
        } else if (udf instanceof AggregateFunction) {
                AggregateFunction a = (AggregateFunction) udf;

                return new AggregateFunctionDefinition(
                                name,
                                a,
                                a.getAccumulatorType(),
                                a.getResultType()
                );
        } else if (udf instanceof TableAggregateFunction) {
                TableAggregateFunction a = (TableAggregateFunction) udf;

                return new TableAggregateFunctionDefinition(
                                name,
                                a,
                                a.getAccumulatorType(),
                                a.getResultType()
                );
        } else {
                throw new UnsupportedOperationException(
                                String.format("Function %s should be of 
ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", 
catalogFunction.getClassName())
                );
        }
}

 

 

> Add advanced function DDL syntax "USING JAR/FILE/ACHIVE"
> --------------------------------------------------------
>
>                 Key: FLINK-14055
>                 URL: https://issues.apache.org/jira/browse/FLINK-14055
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Bowen Li
>            Priority: Major
>              Labels: auto-unassigned, sprint
>
> As FLINK-7151 adds basic function DDL to Flink, this ticket is to support 
> dynamically loading functions from external source in function DDL with 
> advanced syntax like 
>  
> {code:java}
> CREATE FUNCTION func_name as class_name USING JAR/FILE/ACHIEVE 'xxx' [, 
> JAR/FILE/ACHIEVE 'yyy'] ;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to