[ 
https://issues.apache.org/jira/browse/FLINK-29890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636291#comment-17636291
 ] 

dalongliu commented on FLINK-29890:
-----------------------------------

[~afedulov] Sorry for the later response, I have read your test case, you use 
table API and register a udf by your customer jar. Before 1.16, due to we don't 
support `add jar` or `create function ... using  jar` syntax in table API, so 
you have to  create a `URLClassLoader` that contains udf class, and then you 
replace the thread context classloader. Since 1.16, we introduce an 
`MutableURLClassLoader` in table module, this classloader is used to manage all 
the customer jars such as connector or udf jar. we can add the jar to 
`MutableURLClassLoader` via `add jar` or `create function ... using  jar` 
clause. This is helpful to avoid class conflict in table module. 

Regarding how to udf class can be loaded from jar in 1.16, you can use the `add 
jar` or `create function ... using  jar` syntax, more details to see [1] or [2] 
or FLIP-214. For your use case, you don't need to new a classloader and set it 
to thread context classloader in 1.16, you just need to simplify your code as 
follows:
{code:java}
@Test
public void shouldGenerateFlinkJobForInteractiveQueryWithUDFSuccessfully() 
throws Exception {
    final Path jarPath = Paths.get(TEST_FUNCTIONS_LOCATION, 
"user-functions.jar");
    final String jarPathString = String.format("%s%s", "file://", 
jarPath.toAbsolutePath());
    final Configuration config = new Configuration();
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
    final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

    final String functionClassName = "util.LowerCase";

    tEnv.executeSql(String.format("create temporary system function LowerCase 
as '%s' using jar '%s'", functionClassName, jarPathString));

    final Table table = tEnv.fromValues(
        DataTypes.ROW(
            DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
            DataTypes.FIELD("name", DataTypes.STRING())
        ),
        row(1, "ABC"),
        row(2L, "ABCDE")
    );

    final CloseableIterator<Row> iter = tEnv.sqlQuery("SELECT LowerCase(name) 
as name FROM " + table).execute().collect();
    final List<Row> list = new ArrayList<>();
    iter.forEachRemaining(list::add);

    System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
    System.out.println(list);
} {code}
 
 # 
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function]
 # 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/jar/#add-jar

> UDFs classloading from JARs in 1.16 is broken
> ---------------------------------------------
>
>                 Key: FLINK-29890
>                 URL: https://issues.apache.org/jira/browse/FLINK-29890
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.0
>            Reporter: Alexander Fedulov
>            Priority: Critical
>
> 1.16 introduced a lot of changes with respect to classloading in the Table 
> API. The way UDFs could previously be loaded from JARs in 1.15 does not work 
> in 1.16 anymore - it fails with the ClassNotFound exception when UDFs are 
> used at runtime. 
> Here is a repository with a reproducible example:
> [https://github.com/afedulov/udfs-flink-1.16/blob/main/src/test/java/com/example/UDFTest.java]
>  
> It works as is (Flink 1.15.2) and fails when switching the dependencies to 
> 1.16.0.
> Here are some of the PRs that might be related to the issue:
> [https://github.com/apache/flink/pull/20001]
> [https://github.com/apache/flink/pull/19845]
> [https://github.com/apache/flink/pull/20211] (fixes a similar issue 
> introduced after classloading changes in 1.16)
>  
> It is unclear how UDFs can be loaded from JARs in 1.16.
> Ideally, this should be covered by tests and described in the documentation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to