wuchong commented on code in PR #20211:
URL: https://github.com/apache/flink/pull/20211#discussion_r926684832
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java:
##########
@@ -34,4 +34,29 @@ public interface FunctionDefinitionFactory {
* @return a {@link FunctionDefinition}
*/
FunctionDefinition createFunctionDefinition(String name, CatalogFunction
catalogFunction);
Review Comment:
Add default implementation of the old method (users don't need to override
it anymore).
And add a deprecation message on the old method to instruct users to
implement the new method.
https://github.com/apache/flink/pull/4616/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R51
is a good example for evolving methods.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java:
##########
@@ -55,27 +57,56 @@ public HiveFunctionDefinitionFactory(HiveShim hiveShim) {
@Override
public FunctionDefinition createFunctionDefinition(
String name, CatalogFunction catalogFunction) {
- if (catalogFunction.isGeneric()) {
- return createFunctionDefinitionFromFlinkFunction(name,
catalogFunction);
+ return createFunctionDefinition(
+ name, catalogFunction, () ->
Thread.currentThread().getContextClassLoader());
+ }
+
+ @Override
+ public FunctionDefinition createFunctionDefinition(
+ String name, CatalogFunction catalogFunction, Context context) {
+ if (isGenericFunction(catalogFunction, context.getClassLoader())) {
+ return createFunctionDefinitionFromFlinkFunction(name,
catalogFunction, context);
}
- return createFunctionDefinitionFromHiveFunction(name,
catalogFunction.getClassName());
+ return createFunctionDefinitionFromHiveFunction(
+ name, catalogFunction.getClassName(), context);
}
public FunctionDefinition createFunctionDefinitionFromFlinkFunction(
- String name, CatalogFunction catalogFunction) {
+ String name, CatalogFunction catalogFunction, Context context) {
return UserDefinedFunctionHelper.instantiateFunction(
- Thread.currentThread().getContextClassLoader(), null, name,
catalogFunction);
+ context.getClassLoader(), null, name, catalogFunction);
+ }
+
+ /**
+ * Distinguish if the function is a generic function.
+ *
+ * @return whether the function is a generic function
+ */
+ private boolean isGenericFunction(CatalogFunction catalogFunction,
ClassLoader classLoader) {
Review Comment:
We can get rid of the `generic` term now, and call this method
`isFlinkFunction`.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java:
##########
@@ -55,27 +57,56 @@ public HiveFunctionDefinitionFactory(HiveShim hiveShim) {
@Override
public FunctionDefinition createFunctionDefinition(
String name, CatalogFunction catalogFunction) {
- if (catalogFunction.isGeneric()) {
- return createFunctionDefinitionFromFlinkFunction(name,
catalogFunction);
+ return createFunctionDefinition(
+ name, catalogFunction, () ->
Thread.currentThread().getContextClassLoader());
Review Comment:
We don't need to implement it anymore.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java:
##########
@@ -127,13 +122,8 @@ public Class<UDFType> getUDFClass() throws
ClassNotFoundException {
* @return the UDF deserialized
*/
private UDFType deserializeUDF() {
- try {
- return (UDFType)
- SerializationUtilities.deserializeObject(
- udfSerializedString, (Class<Serializable>)
getUDFClass());
- } catch (ClassNotFoundException e) {
- throw new FlinkHiveUDFException(
- String.format("Failed to deserialize function %s.",
className), e);
- }
+ return (UDFType)
+ SerializationUtilities.deserializeObject(
+ udfSerializedString, (Class<Serializable>)
getUDFClass());
Review Comment:
Will the deserialization use the classloader of the `functionClz`? Is there
any test to cover this code path?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java:
##########
@@ -34,4 +34,29 @@ public interface FunctionDefinitionFactory {
* @return a {@link FunctionDefinition}
*/
FunctionDefinition createFunctionDefinition(String name, CatalogFunction
catalogFunction);
+
+ /**
+ * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}
with the given {@link
+ * Context} containing the class loader of the current session, which is
useful when it's needed
+ * to load class from class name.
+ *
+ * <p>The default implementation will call {@link
#createFunctionDefinition(String,
+ * CatalogFunction)} directly.
+ *
+ * @param name name of the {@link CatalogFunction}
+ * @param catalogFunction the catalog function
+ * @param context the {@link Context} for creating function definition
+ * @return a {@link FunctionDefinition}
+ */
+ default FunctionDefinition createFunctionDefinition(
+ String name, CatalogFunction catalogFunction, Context context) {
+ return createFunctionDefinition(name, catalogFunction);
Review Comment:
When calling the old method, we should replace the thread classloader with
the user classloader (see
`org.apache.flink.table.client.gateway.context.ExecutionContext#wrapClassLoader`).
Otherwise, classes can't be found if it still use the old method. Please add a
test in the planner to cover this problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]