[
https://issues.apache.org/jira/browse/FLINK-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376670#comment-17376670
]
frank wang commented on FLINK-14055:
------------------------------------
[~ZhenqiuHuang],we implement CatalogFactory interface and
FunctionDefinitionFactory interface
and the core code like this
@Override
public FunctionDefinition createFunctionDefinition(String name, CatalogFunction
catalogFunction) {
// Currently only handles Java class-based functions
Object func;
String classNameInnter = catalogFunction.getClassName().substring(0,
catalogFunction.getClassName().lastIndexOf("."))+"$"+catalogFunction.getClassName().substring(catalogFunction.getClassName().lastIndexOf(".")
+ 1);
HdfsClassLoader hdfsClassLoader = null;
try {
hdfsClassLoader =
HdfsClassLoader.getClazz(catalogFunction.getProperties().get("JAR_PATH"));
func = hdfsClassLoader.loadClass(classNameInnter).newInstance();
} catch (Exception e) {
try {
func =
hdfsClassLoader.loadClass(catalogFunction.getClassName()).newInstance();
} catch (Exception ie) {
try {
func =
Thread.currentThread().getContextClassLoader().loadClass(catalogFunction.getClassName()).newInstance();
} catch (Exception iie) {
throw new IllegalStateException(
String.format("Failed
instantiating '%s'", catalogFunction.getClassName())
);
}
}
}
UserDefinedFunction udf = (UserDefinedFunction) func;
if (udf instanceof ScalarFunction) {
return new ScalarFunctionDefinition(
name,
(ScalarFunction) udf
);
} else if (udf instanceof TableFunction) {
TableFunction t = (TableFunction) udf;
return new TableFunctionDefinition(
name,
t,
t.getResultType()
);
} else if (udf instanceof AggregateFunction) {
AggregateFunction a = (AggregateFunction) udf;
return new AggregateFunctionDefinition(
name,
a,
a.getAccumulatorType(),
a.getResultType()
);
} else if (udf instanceof TableAggregateFunction) {
TableAggregateFunction a = (TableAggregateFunction) udf;
return new TableAggregateFunctionDefinition(
name,
a,
a.getAccumulatorType(),
a.getResultType()
);
} else {
throw new UnsupportedOperationException(
String.format("Function %s should be of
ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction",
catalogFunction.getClassName())
);
}
}
> Add advanced function DDL syntax "USING JAR/FILE/ACHIVE"
> --------------------------------------------------------
>
> Key: FLINK-14055
> URL: https://issues.apache.org/jira/browse/FLINK-14055
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: Bowen Li
> Priority: Major
> Labels: auto-unassigned, sprint
>
> As FLINK-7151 adds basic function DDL to Flink, this ticket is to support
> dynamically loading functions from external source in function DDL with
> advanced syntax like
>
> {code:java}
> CREATE FUNCTION func_name as class_name USING JAR/FILE/ACHIEVE 'xxx' [,
> JAR/FILE/ACHIEVE 'yyy'] ;
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)