[ 
https://issues.apache.org/jira/browse/FLINK-29890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693105#comment-17693105
 ] 

dalongliu commented on FLINK-29890:
-----------------------------------

[~charles-tan] Yes, Please see the issue 
https://issues.apache.org/jira/browse/FLINK-29890 description, we need the 
runtime to provide a generic classloader to fix the problem. Currently, you can 
work around it by changing your code as follows:
{code:java}
@Test
public void shouldGenerateFlinkJobForInteractiveQueryWithUDFSuccessfully() 
throws Exception {
    final EnvironmentSettings settings = 
EnvironmentSettings.newInstance().build();
    final MutableURLClassLoader userClassLoader =
            FlinkUserCodeClassLoaders.create(
                    new URL[0], settings.getUserClassLoader(), 
settings.getConfiguration());

    StreamExecutionEnvironment streamExecEnv =
            new StreamExecutionEnvironment(settings.getConfiguration(), 
userClassLoader);
    final Executor executor =
            StreamTableEnvironmentImpl.lookupExecutor(userClassLoader, 
streamExecEnv);

    TableConfig tableConfig = TableConfig.getDefault();
    tableConfig.setRootConfiguration(executor.getConfiguration());
    tableConfig.addConfiguration(settings.getConfiguration());

    final ResourceManager resourceManager =
            new ResourceManager(settings.getConfiguration(), userClassLoader);
    final ModuleManager moduleManager = new ModuleManager();

    final CatalogManager catalogManager =
            CatalogManager.newBuilder()
                    .classLoader(userClassLoader)
                    .config(tableConfig)
                    .defaultCatalog(
                            settings.getBuiltInCatalogName(),
                            new GenericInMemoryCatalog(
                                    settings.getBuiltInCatalogName(),
                                    settings.getBuiltInDatabaseName()))
                    .executionConfig(streamExecEnv.getConfig())
                    .build();

    final FunctionCatalog functionCatalog =
            new FunctionCatalog(tableConfig, resourceManager, catalogManager, 
moduleManager);

    final Planner planner =
            PlannerFactoryUtil.createPlanner(
                    executor,
                    tableConfig,
                    userClassLoader,
                    moduleManager,
                    catalogManager,
                    functionCatalog);
    final StreamTableEnvironment tEnv =new StreamTableEnvironmentImpl(
            catalogManager,
            moduleManager,
            resourceManager,
            functionCatalog,
            tableConfig,
            streamExecEnv,
            planner,
            executor,
            settings.isStreamingMode());

    final Path jarPath = Paths.get("", "user-functions.jar");
    final String jarPathString = String.format("%s%s", "file://", 
jarPath.toAbsolutePath());

    final String functionClassName = "util.LowerCase";

    tEnv.executeSql(
            String.format(
                    "create temporary system function LowerCase as '%s' using 
jar '%s'",
                    functionClassName, jarPathString));

    final Table table =
            tEnv.fromValues(
                    DataTypes.ROW(
                            DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
                            DataTypes.FIELD("name", DataTypes.STRING())),
                    row(1, "ABC"),
                    row(2L, "ABCDE"));

    final CloseableIterator<Row> iter =
            tEnv.sqlQuery("SELECT LowerCase(name) as name FROM " + 
table).execute().collect();
    final List<Row> list = new ArrayList<>();
    iter.forEachRemaining(list::add);

    System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
    System.out.println(list);
} {code}
It just looks a bit hack.

> UDFs classloading from JARs in 1.16 is broken
> ---------------------------------------------
>
>                 Key: FLINK-29890
>                 URL: https://issues.apache.org/jira/browse/FLINK-29890
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.0
>            Reporter: Alexander Fedulov
>            Priority: Major
>
> 1.16 introduced a lot of changes with respect to classloading in the Table 
> API. The way UDFs could previously be loaded from JARs in 1.15 does not work 
> in 1.16 anymore - it fails with the ClassNotFound exception when UDFs are 
> used at runtime. 
> Here is a repository with a reproducible example:
> [https://github.com/afedulov/udfs-flink-1.16/blob/main/src/test/java/com/example/UDFTest.java]
>  
> It works as is (Flink 1.15.2) and fails when switching the dependencies to 
> 1.16.0.
> Here are some of the PRs that might be related to the issue:
> [https://github.com/apache/flink/pull/20001]
> [https://github.com/apache/flink/pull/19845]
> [https://github.com/apache/flink/pull/20211] (fixes a similar issue 
> introduced after classloading changes in 1.16)
>  
> It is unclear how UDFs can be loaded from JARs in 1.16.
> Ideally, this should be covered by tests and described in the documentation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to