[
https://issues.apache.org/jira/browse/FLINK-29890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693105#comment-17693105
]
dalongliu commented on FLINK-29890:
-----------------------------------
[~charles-tan] Yes, Please see the issue
https://issues.apache.org/jira/browse/FLINK-29890 description, we need the
runtime to provide a generic classloader to fix the problem. Currently, you can
work around it by changing your code as follows:
{code:java}
@Test
public void shouldGenerateFlinkJobForInteractiveQueryWithUDFSuccessfully()
throws Exception {
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().build();
final MutableURLClassLoader userClassLoader =
FlinkUserCodeClassLoaders.create(
new URL[0], settings.getUserClassLoader(),
settings.getConfiguration());
StreamExecutionEnvironment streamExecEnv =
new StreamExecutionEnvironment(settings.getConfiguration(),
userClassLoader);
final Executor executor =
StreamTableEnvironmentImpl.lookupExecutor(userClassLoader,
streamExecEnv);
TableConfig tableConfig = TableConfig.getDefault();
tableConfig.setRootConfiguration(executor.getConfiguration());
tableConfig.addConfiguration(settings.getConfiguration());
final ResourceManager resourceManager =
new ResourceManager(settings.getConfiguration(), userClassLoader);
final ModuleManager moduleManager = new ModuleManager();
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(userClassLoader)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
.executionConfig(streamExecEnv.getConfig())
.build();
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, resourceManager, catalogManager,
moduleManager);
final Planner planner =
PlannerFactoryUtil.createPlanner(
executor,
tableConfig,
userClassLoader,
moduleManager,
catalogManager,
functionCatalog);
final StreamTableEnvironment tEnv =new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
resourceManager,
functionCatalog,
tableConfig,
streamExecEnv,
planner,
executor,
settings.isStreamingMode());
final Path jarPath = Paths.get("", "user-functions.jar");
final String jarPathString = String.format("%s%s", "file://",
jarPath.toAbsolutePath());
final String functionClassName = "util.LowerCase";
tEnv.executeSql(
String.format(
"create temporary system function LowerCase as '%s' using
jar '%s'",
functionClassName, jarPathString));
final Table table =
tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())),
row(1, "ABC"),
row(2L, "ABCDE"));
final CloseableIterator<Row> iter =
tEnv.sqlQuery("SELECT LowerCase(name) as name FROM " +
table).execute().collect();
final List<Row> list = new ArrayList<>();
iter.forEachRemaining(list::add);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
System.out.println(list);
} {code}
It just looks a bit hack.
> UDFs classloading from JARs in 1.16 is broken
> ---------------------------------------------
>
> Key: FLINK-29890
> URL: https://issues.apache.org/jira/browse/FLINK-29890
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.16.0
> Reporter: Alexander Fedulov
> Priority: Major
>
> 1.16 introduced a lot of changes with respect to classloading in the Table
> API. The way UDFs could previously be loaded from JARs in 1.15 does not work
> in 1.16 anymore - it fails with the ClassNotFound exception when UDFs are
> used at runtime.
> Here is a repository with a reproducible example:
> [https://github.com/afedulov/udfs-flink-1.16/blob/main/src/test/java/com/example/UDFTest.java]
>
> It works as is (Flink 1.15.2) and fails when switching the dependencies to
> 1.16.0.
> Here are some of the PRs that might be related to the issue:
> [https://github.com/apache/flink/pull/20001]
> [https://github.com/apache/flink/pull/19845]
> [https://github.com/apache/flink/pull/20211] (fixes a similar issue
> introduced after classloading changes in 1.16)
>
> It is unclear how UDFs can be loaded from JARs in 1.16.
> Ideally, this should be covered by tests and described in the documentation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)