[
https://issues.apache.org/jira/browse/FLINK-29890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636291#comment-17636291
]
dalongliu commented on FLINK-29890:
-----------------------------------
[~afedulov] Sorry for the later response, I have read your test case, you use
table API and register a udf by your customer jar. Before 1.16, due to we don't
support `add jar` or `create function ... using jar` syntax in table API, so
you have to create a `URLClassLoader` that contains udf class, and then you
replace the thread context classloader. Since 1.16, we introduce an
`MutableURLClassLoader` in table module, this classloader is used to manage all
the customer jars such as connector or udf jar. we can add the jar to
`MutableURLClassLoader` via `add jar` or `create function ... using jar`
clause. This is helpful to avoid class conflict in table module.
Regarding how to udf class can be loaded from jar in 1.16, you can use the `add
jar` or `create function ... using jar` syntax, more details to see [1] or [2]
or FLIP-214. For your use case, you don't need to new a classloader and set it
to thread context classloader in 1.16, you just need to simplify your code as
follows:
{code:java}
@Test
public void shouldGenerateFlinkJobForInteractiveQueryWithUDFSuccessfully()
throws Exception {
final Path jarPath = Paths.get(TEST_FUNCTIONS_LOCATION,
"user-functions.jar");
final String jarPathString = String.format("%s%s", "file://",
jarPath.toAbsolutePath());
final Configuration config = new Configuration();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
final String functionClassName = "util.LowerCase";
tEnv.executeSql(String.format("create temporary system function LowerCase
as '%s' using jar '%s'", functionClassName, jarPathString));
final Table table = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "ABC"),
row(2L, "ABCDE")
);
final CloseableIterator<Row> iter = tEnv.sqlQuery("SELECT LowerCase(name)
as name FROM " + table).execute().collect();
final List<Row> list = new ArrayList<>();
iter.forEachRemaining(list::add);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
System.out.println(list);
} {code}
#
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function]
#
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/jar/#add-jar
> UDFs classloading from JARs in 1.16 is broken
> ---------------------------------------------
>
> Key: FLINK-29890
> URL: https://issues.apache.org/jira/browse/FLINK-29890
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.16.0
> Reporter: Alexander Fedulov
> Priority: Critical
>
> 1.16 introduced a lot of changes with respect to classloading in the Table
> API. The way UDFs could previously be loaded from JARs in 1.15 does not work
> in 1.16 anymore - it fails with the ClassNotFound exception when UDFs are
> used at runtime.
> Here is a repository with a reproducible example:
> [https://github.com/afedulov/udfs-flink-1.16/blob/main/src/test/java/com/example/UDFTest.java]
>
> It works as is (Flink 1.15.2) and fails when switching the dependencies to
> 1.16.0.
> Here are some of the PRs that might be related to the issue:
> [https://github.com/apache/flink/pull/20001]
> [https://github.com/apache/flink/pull/19845]
> [https://github.com/apache/flink/pull/20211] (fixes a similar issue
> introduced after classloading changes in 1.16)
>
> It is unclear how UDFs can be loaded from JARs in 1.16.
> Ideally, this should be covered by tests and described in the documentation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)